Skip to main content
The Agent-to-Agent (A2A) Protocol enables standardized communication between AI agents, allowing them to discover each other’s capabilities and collaborate on complex tasks.

What is A2A?

The A2A Protocol is a JSON-RPC based specification that defines:
  • Standard Communication: JSON-RPC messaging with streaming and non-streaming support
  • Agent Discovery: Automatic agent cards describing capabilities and endpoints
  • Rich Interactions: Tasks, status updates, and artifact sharing
  • Interoperability: Works across different frameworks and platforms

Quick Start

Every agent served with Agentor automatically supports A2A:
from agentor import Agentor

agent = Agentor(
    name="Weather Agent",
    model="gpt-5-mini",
    tools=["get_weather"],
    instructions="You are a helpful weather assistant."
)

# Serve with A2A protocol enabled automatically
agent.serve(port=8000)
Your agent is now discoverable at:
http://localhost:8000/.well-known/agent-card.json

Agent Card

The agent card is a manifest that describes your agent’s capabilities:
{
  "name": "Weather Agent",
  "description": "You are a helpful weather assistant.",
  "version": "0.0.1",
  "url": "http://localhost:8000",
  "capabilities": {
    "streaming": true,
    "statefulness": true,
    "asyncProcessing": true
  },
  "skills": [
    {
      "id": "tool_get_weather",
      "name": "get_weather",
      "description": "Get weather information for a location",
      "tags": []
    }
  ]
}
The agent card is automatically generated from your agent configuration.

A2A Endpoints

When you serve an agent, these endpoints are automatically created:
  • GET /.well-known/agent-card.json - Agent discovery
  • POST / - JSON-RPC endpoint for all A2A operations
  • POST /chat - Simplified chat endpoint

Supported Methods

  • message/send - Send a message and get a response
  • message/stream - Send a message and stream the response
  • tasks/get - Get task status (if implemented)
  • tasks/cancel - Cancel a running task (if implemented)

Custom A2A Server

For advanced use cases, customize the A2A controller:
from agentor import Agentor
from agentor.a2a import A2AController, AgentSkill
from a2a.types import AgentCapabilities
from fastapi import FastAPI
import uvicorn

# Create your agent
agent = Agentor(
    name="Research Agent",
    model="gpt-5-mini",
    instructions="You are a research assistant."
)

# Create custom A2A controller
controller = A2AController(
    name="Research Agent",
    description="Advanced research assistant with web search and analysis",
    url="http://localhost:8000",
    version="1.0.0",
    skills=[
        AgentSkill(
            id="research",
            name="Research",
            description="Conduct in-depth research on any topic",
            tags=["research", "analysis"]
        ),
        AgentSkill(
            id="summarize",
            name="Summarize",
            description="Create concise summaries of long documents",
            tags=["summarization", "nlp"]
        )
    ],
    capabilities=AgentCapabilities(
        streaming=True,
        statefulness=True,
        asyncProcessing=True
    )
)

# Add custom endpoints
@controller.get("/status")
async def status():
    return {"status": "operational", "load": "normal"}

# Create FastAPI app
app = FastAPI()
app.include_router(controller)

if __name__ == "__main__":
    print("Agent card: http://localhost:8000/.well-known/agent-card.json")
    uvicorn.run(app, host="0.0.0.0", port=8000)

Streaming Responses

The A2A protocol supports Server-Sent Events (SSE) for streaming:

Server Side

from agentor import Agentor

agent = Agentor(
    name="Streaming Agent",
    model="gpt-5-mini"
)

# Streaming is automatically enabled
agent.serve(port=8000)

Client Side

Send a streaming request:
import requests
import json

url = "http://localhost:8000/"
headers = {"Content-Type": "application/json"}

payload = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "message/stream",
    "params": {
        "message": {
            "parts": [
                {
                    "kind": "text",
                    "text": "Explain quantum computing"
                }
            ]
        }
    }
}

response = requests.post(url, json=payload, headers=headers, stream=True)

for line in response.iter_lines(decode_unicode=True):
    if line.startswith("data: "):
        data = json.loads(line[6:])
        result = data.get("result", {})
        
        # Handle different event types
        if "artifact" in result:
            artifact = result["artifact"]
            print(artifact["parts"][0]["text"], end="", flush=True)
        elif "status" in result:
            print(f"\nStatus: {result['status']['state']}")

Task Management

A2A includes task lifecycle management:
from agentor.a2a import A2AController
from a2a.types import Task, TaskStatus, TaskState, JSONRPCResponse
import uuid

controller = A2AController(
    name="Task Agent",
    description="Agent with task management"
)

# Store tasks (in production, use a database)
tasks = {}

async def handle_message_stream(request):
    """Custom streaming handler with task tracking."""
    task_id = f"task_{uuid.uuid4()}"
    
    # Create task
    task = Task(
        id=task_id,
        context_id=f"ctx_{uuid.uuid4()}",
        status=TaskStatus(state=TaskState.working)
    )
    tasks[task_id] = task
    
    # Process and stream response
    # ... your logic here
    
    # Update task status
    tasks[task_id].status.state = TaskState.completed
    
    return response

async def handle_tasks_get(request):
    """Get task status."""
    task_id = request.params.get("task_id")
    
    if task_id not in tasks:
        return JSONRPCResponse(
            id=request.id,
            error={"code": -32600, "message": "Task not found"}
        )
    
    return JSONRPCResponse(
        id=request.id,
        result=tasks[task_id].model_dump()
    )

async def handle_tasks_cancel(request):
    """Cancel a task."""
    task_id = request.params.get("task_id")
    
    if task_id in tasks:
        tasks[task_id].status.state = TaskState.cancelled
        return JSONRPCResponse(
            id=request.id,
            result={"cancelled": True}
        )
    
    return JSONRPCResponse(
        id=request.id,
        error={"code": -32600, "message": "Task not found"}
    )

# Register handlers
controller.add_handler("message/stream", handle_message_stream)
controller.add_handler("tasks/get", handle_tasks_get)
controller.add_handler("tasks/cancel", handle_tasks_cancel)

Multi-Agent Orchestration

Coordinate multiple agents:
import asyncio
from agentor import Agentor

# Create specialized agents
research_agent = Agentor(
    name="Research Agent",
    model="gpt-5-mini",
    instructions="You research topics and gather information."
)

writing_agent = Agentor(
    name="Writing Agent",
    model="gpt-5-mini",
    instructions="You write articles based on research."
)

review_agent = Agentor(
    name="Review Agent",
    model="gpt-5-mini",
    instructions="You review and improve written content."
)

async def collaborative_workflow(topic: str):
    """Multi-agent workflow for content creation."""
    
    # Step 1: Research
    print("[1/3] Researching...")
    research = await research_agent.arun(
        f"Research the topic: {topic}. Provide key facts and insights."
    )
    
    # Step 2: Write
    print("[2/3] Writing...")
    draft = await writing_agent.arun(
        f"Write an article about {topic} using this research:\n{research.final_output}"
    )
    
    # Step 3: Review
    print("[3/3] Reviewing...")
    final = await review_agent.arun(
        f"Review and improve this article:\n{draft.final_output}"
    )
    
    return final.final_output

# Run the workflow
result = asyncio.run(collaborative_workflow("quantum computing"))
print(result)

Agent Discovery

Discover available agents by fetching their agent cards:
import requests

def discover_agent(url: str):
    """Fetch agent card from a URL."""
    card_url = f"{url}/.well-known/agent-card.json"
    response = requests.get(card_url)
    
    if response.status_code == 200:
        card = response.json()
        print(f"Agent: {card['name']}")
        print(f"Description: {card['description']}")
        print(f"\nSkills:")
        for skill in card.get('skills', []):
            print(f"  - {skill['name']}: {skill['description']}")
        return card
    else:
        print(f"Error: Could not fetch agent card from {card_url}")
        return None

# Discover an agent
agent_card = discover_agent("http://localhost:8000")

Best Practices

1
Define Clear Agent Roles
2
Give each agent a specific purpose:
3
agent = Agentor(
    name="Data Analyst Agent",
    model="gpt-5-mini",
    instructions="""
    You are a data analyst agent specialized in:
    - Statistical analysis
    - Data visualization recommendations
    - Trend identification
    
    You do NOT write code or access databases directly.
    """
)
4
Use Descriptive Skills
5
Help other agents understand what your agent can do:
6
from agentor.a2a import AgentSkill

skills = [
    AgentSkill(
        id="analyze_data",
        name="Analyze Data",
        description="Perform statistical analysis on datasets",
        tags=["statistics", "analysis", "data"]
    ),
    AgentSkill(
        id="visualize",
        name="Recommend Visualizations",
        description="Suggest appropriate charts and graphs for data",
        tags=["visualization", "charts"]
    )
]
7
Handle Errors Gracefully
8
async def safe_agent_call(agent, message):
    """Call an agent with error handling."""
    try:
        result = await agent.arun(message)
        return result.final_output
    except Exception as e:
        print(f"Agent error: {e}")
        return None
9
Version Your Agents
10
controller = A2AController(
    name="My Agent",
    version="2.1.0",  # Semantic versioning
    description="Agent with enhanced capabilities"
)
11
Monitor Task Status
12
Implement task tracking for long-running operations:
13
# Client polls for status
def wait_for_task(agent_url, task_id, timeout=60):
    import time
    start = time.time()
    
    while time.time() - start < timeout:
        response = requests.post(
            agent_url,
            json={
                "jsonrpc": "2.0",
                "id": 1,
                "method": "tasks/get",
                "params": {"task_id": task_id}
            }
        )
        
        task = response.json()["result"]
        if task["status"]["state"] in ["completed", "failed"]:
            return task
        
        time.sleep(1)
    
    raise TimeoutError("Task did not complete in time")

Deployment

Deploy A2A-enabled agents to Celesto:
celesto deploy
Your agent will be available at:
https://api.celesto.ai/deploy/apps/<app-name>
https://api.celesto.ai/deploy/apps/<app-name>/.well-known/agent-card.json

Next Steps

Last modified on March 4, 2026