The Agent-to-Agent (A2A) Protocol enables standardized communication between AI agents, allowing them to discover each other’s capabilities and collaborate on complex tasks.
What is A2A?
The A2A Protocol is a JSON-RPC based specification that defines:
- Standard Communication: JSON-RPC messaging with streaming and non-streaming support
- Agent Discovery: Automatic agent cards describing capabilities and endpoints
- Rich Interactions: Tasks, status updates, and artifact sharing
- Interoperability: Works across different frameworks and platforms
Quick Start
Every agent served with Agentor automatically supports A2A:
from agentor import Agentor
agent = Agentor(
name="Weather Agent",
model="gpt-5-mini",
tools=["get_weather"],
instructions="You are a helpful weather assistant."
)
# Serve with A2A protocol enabled automatically
agent.serve(port=8000)
Your agent is now discoverable at:
http://localhost:8000/.well-known/agent-card.json
Agent Card
The agent card is a manifest that describes your agent’s capabilities:
{
"name": "Weather Agent",
"description": "You are a helpful weather assistant.",
"version": "0.0.1",
"url": "http://localhost:8000",
"capabilities": {
"streaming": true,
"statefulness": true,
"asyncProcessing": true
},
"skills": [
{
"id": "tool_get_weather",
"name": "get_weather",
"description": "Get weather information for a location",
"tags": []
}
]
}
The agent card is automatically generated from your agent configuration.
A2A Endpoints
When you serve an agent, these endpoints are automatically created:
GET /.well-known/agent-card.json - Agent discovery
POST / - JSON-RPC endpoint for all A2A operations
POST /chat - Simplified chat endpoint
Supported Methods
message/send - Send a message and get a response
message/stream - Send a message and stream the response
tasks/get - Get task status (if implemented)
tasks/cancel - Cancel a running task (if implemented)
Custom A2A Server
For advanced use cases, customize the A2A controller:
from agentor import Agentor
from agentor.a2a import A2AController, AgentSkill
from a2a.types import AgentCapabilities
from fastapi import FastAPI
import uvicorn
# Create your agent
agent = Agentor(
name="Research Agent",
model="gpt-5-mini",
instructions="You are a research assistant."
)
# Create custom A2A controller
controller = A2AController(
name="Research Agent",
description="Advanced research assistant with web search and analysis",
url="http://localhost:8000",
version="1.0.0",
skills=[
AgentSkill(
id="research",
name="Research",
description="Conduct in-depth research on any topic",
tags=["research", "analysis"]
),
AgentSkill(
id="summarize",
name="Summarize",
description="Create concise summaries of long documents",
tags=["summarization", "nlp"]
)
],
capabilities=AgentCapabilities(
streaming=True,
statefulness=True,
asyncProcessing=True
)
)
# Add custom endpoints
@controller.get("/status")
async def status():
return {"status": "operational", "load": "normal"}
# Create FastAPI app
app = FastAPI()
app.include_router(controller)
if __name__ == "__main__":
print("Agent card: http://localhost:8000/.well-known/agent-card.json")
uvicorn.run(app, host="0.0.0.0", port=8000)
Streaming Responses
The A2A protocol supports Server-Sent Events (SSE) for streaming:
Server Side
from agentor import Agentor
agent = Agentor(
name="Streaming Agent",
model="gpt-5-mini"
)
# Streaming is automatically enabled
agent.serve(port=8000)
Client Side
Send a streaming request:
import requests
import json
url = "http://localhost:8000/"
headers = {"Content-Type": "application/json"}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "message/stream",
"params": {
"message": {
"parts": [
{
"kind": "text",
"text": "Explain quantum computing"
}
]
}
}
}
response = requests.post(url, json=payload, headers=headers, stream=True)
for line in response.iter_lines(decode_unicode=True):
if line.startswith("data: "):
data = json.loads(line[6:])
result = data.get("result", {})
# Handle different event types
if "artifact" in result:
artifact = result["artifact"]
print(artifact["parts"][0]["text"], end="", flush=True)
elif "status" in result:
print(f"\nStatus: {result['status']['state']}")
Task Management
A2A includes task lifecycle management:
from agentor.a2a import A2AController
from a2a.types import Task, TaskStatus, TaskState, JSONRPCResponse
import uuid
controller = A2AController(
name="Task Agent",
description="Agent with task management"
)
# Store tasks (in production, use a database)
tasks = {}
async def handle_message_stream(request):
"""Custom streaming handler with task tracking."""
task_id = f"task_{uuid.uuid4()}"
# Create task
task = Task(
id=task_id,
context_id=f"ctx_{uuid.uuid4()}",
status=TaskStatus(state=TaskState.working)
)
tasks[task_id] = task
# Process and stream response
# ... your logic here
# Update task status
tasks[task_id].status.state = TaskState.completed
return response
async def handle_tasks_get(request):
"""Get task status."""
task_id = request.params.get("task_id")
if task_id not in tasks:
return JSONRPCResponse(
id=request.id,
error={"code": -32600, "message": "Task not found"}
)
return JSONRPCResponse(
id=request.id,
result=tasks[task_id].model_dump()
)
async def handle_tasks_cancel(request):
"""Cancel a task."""
task_id = request.params.get("task_id")
if task_id in tasks:
tasks[task_id].status.state = TaskState.cancelled
return JSONRPCResponse(
id=request.id,
result={"cancelled": True}
)
return JSONRPCResponse(
id=request.id,
error={"code": -32600, "message": "Task not found"}
)
# Register handlers
controller.add_handler("message/stream", handle_message_stream)
controller.add_handler("tasks/get", handle_tasks_get)
controller.add_handler("tasks/cancel", handle_tasks_cancel)
Multi-Agent Orchestration
Coordinate multiple agents:
import asyncio
from agentor import Agentor
# Create specialized agents
research_agent = Agentor(
name="Research Agent",
model="gpt-5-mini",
instructions="You research topics and gather information."
)
writing_agent = Agentor(
name="Writing Agent",
model="gpt-5-mini",
instructions="You write articles based on research."
)
review_agent = Agentor(
name="Review Agent",
model="gpt-5-mini",
instructions="You review and improve written content."
)
async def collaborative_workflow(topic: str):
"""Multi-agent workflow for content creation."""
# Step 1: Research
print("[1/3] Researching...")
research = await research_agent.arun(
f"Research the topic: {topic}. Provide key facts and insights."
)
# Step 2: Write
print("[2/3] Writing...")
draft = await writing_agent.arun(
f"Write an article about {topic} using this research:\n{research.final_output}"
)
# Step 3: Review
print("[3/3] Reviewing...")
final = await review_agent.arun(
f"Review and improve this article:\n{draft.final_output}"
)
return final.final_output
# Run the workflow
result = asyncio.run(collaborative_workflow("quantum computing"))
print(result)
Agent Discovery
Discover available agents by fetching their agent cards:
import requests
def discover_agent(url: str):
"""Fetch agent card from a URL."""
card_url = f"{url}/.well-known/agent-card.json"
response = requests.get(card_url)
if response.status_code == 200:
card = response.json()
print(f"Agent: {card['name']}")
print(f"Description: {card['description']}")
print(f"\nSkills:")
for skill in card.get('skills', []):
print(f" - {skill['name']}: {skill['description']}")
return card
else:
print(f"Error: Could not fetch agent card from {card_url}")
return None
# Discover an agent
agent_card = discover_agent("http://localhost:8000")
Best Practices
Give each agent a specific purpose:
agent = Agentor(
name="Data Analyst Agent",
model="gpt-5-mini",
instructions="""
You are a data analyst agent specialized in:
- Statistical analysis
- Data visualization recommendations
- Trend identification
You do NOT write code or access databases directly.
"""
)
Help other agents understand what your agent can do:
from agentor.a2a import AgentSkill
skills = [
AgentSkill(
id="analyze_data",
name="Analyze Data",
description="Perform statistical analysis on datasets",
tags=["statistics", "analysis", "data"]
),
AgentSkill(
id="visualize",
name="Recommend Visualizations",
description="Suggest appropriate charts and graphs for data",
tags=["visualization", "charts"]
)
]
async def safe_agent_call(agent, message):
"""Call an agent with error handling."""
try:
result = await agent.arun(message)
return result.final_output
except Exception as e:
print(f"Agent error: {e}")
return None
controller = A2AController(
name="My Agent",
version="2.1.0", # Semantic versioning
description="Agent with enhanced capabilities"
)
Implement task tracking for long-running operations:
# Client polls for status
def wait_for_task(agent_url, task_id, timeout=60):
import time
start = time.time()
while time.time() - start < timeout:
response = requests.post(
agent_url,
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tasks/get",
"params": {"task_id": task_id}
}
)
task = response.json()["result"]
if task["status"]["state"] in ["completed", "failed"]:
return task
time.sleep(1)
raise TimeoutError("Task did not complete in time")
Deployment
Deploy A2A-enabled agents to Celesto:
Your agent will be available at:
https://api.celesto.ai/deploy/apps/<app-name>
https://api.celesto.ai/deploy/apps/<app-name>/.well-known/agent-card.json
Next Steps
Last modified on March 4, 2026