Skip to main content

Overview

The Agent-to-Agent (A2A) Protocol defines standard specifications for agent communication and message formatting, enabling seamless interoperability between different AI agents. Every agent served with agent.serve() automatically becomes A2A-compatible with standardized endpoints and agent card discovery.

Key Features

  • Standard Communication - JSON-RPC 2.0 based messaging
  • Agent Discovery - Automatic agent card generation at /.well-known/agent-card.json
  • Rich Interactions - Support for tasks, status updates, and artifact streaming
  • Protocol Version - Implements A2A protocol v0.3.0

Quick Start

Serving an agent automatically enables A2A protocol (src/agentor/core/agent.py:513):
from agentor import Agentor

agent = Agentor(
    name="Weather Agent",
    model="gpt-5-mini",
    tools=["get_weather"],
)

# Serve with A2A protocol enabled
agent.serve(port=8000)
The agent card is now available at:
http://localhost:8000/.well-known/agent-card.json

Agent Card

The agent card describes agent capabilities, endpoints, and skills (src/agentor/a2a.py:53):
{
  "name": "Weather Agent",
  "description": "Agent instructions and description",
  "url": "http://localhost:8000",
  "version": "0.0.1",
  "skills": [
    {
      "id": "tool_get_weather",
      "name": "get_weather",
      "description": "Returns the weather in the given city",
      "tags": []
    }
  ],
  "capabilities": {
    "streaming": true,
    "statefulness": true,
    "asyncProcessing": true
  },
  "defaultInputModes": ["application/json"],
  "securitySchemes": {},
  "security": []
}

Agent Capabilities

The capabilities object (src/agentor/a2a.py:40) indicates:
  • streaming - Supports Server-Sent Events for real-time responses
  • statefulness - Maintains conversation context across requests
  • asyncProcessing - Can handle long-running tasks

A2A Controller

The A2AController (src/agentor/a2a.py:20) implements the A2A protocol on top of FastAPI:
from agentor.a2a import A2AController, AgentSkill

controller = A2AController(
    name="My Agent",
    description="Agent description",
    url="http://localhost:8000",
    version="1.0.0",
    skills=[
        AgentSkill(
            id="skill_1",
            name="Skill Name",
            description="What the skill does",
            tags=["category"]
        )
    ]
)

Custom Endpoints

Add custom routes to the controller (src/agentor/core/agent.py:554):
controller.add_api_route("/chat", chat_handler, methods=["POST"])
controller.add_api_route("/health", health_handler, methods=["GET"])

JSON-RPC Methods

A2A protocol implements these JSON-RPC 2.0 methods (src/agentor/a2a.py:90):

message/send

Send a non-streaming message:
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "message/send",
  "params": {
    "message": {
      "parts": [
        {
          "kind": "text",
          "text": "What is the weather in London?"
        }
      ]
    }
  }
}

message/stream

Send a streaming message with Server-Sent Events (src/agentor/a2a.py:114):
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "message/stream",
  "params": {
    "message": {
      "parts": [
        {
          "kind": "text",
          "text": "Tell me a story"
        }
      ]
    }
  }
}
The response is an event stream with:
  1. Task - Initial task object
  2. TaskArtifactUpdateEvent - Streaming content updates
  3. TaskStatusUpdateEvent - Final completion status

tasks/get

Retrieve task status:
{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tasks/get",
  "params": {"taskId": "task_uuid"}
}

tasks/cancel

Cancel a running task:
{
  "jsonrpc": "2.0",
  "id": 4,
  "method": "tasks/cancel",
  "params": {"taskId": "task_uuid"}
}

Streaming Implementation

The streaming handler (src/agentor/core/agent.py:579) sends events in Server-Sent Events format:
async def _message_stream_handler(
    self, request: SendStreamingMessageRequest
) -> StreamingResponse:
    async def event_generator():
        task_id = f"task_{uuid.uuid4()}"
        context_id = f"ctx_{uuid.uuid4()}"
        artifact_id = f"artifact_{uuid.uuid4()}"
        
        # Send initial task
        task = Task(
            id=task_id,
            context_id=context_id,
            status=TaskStatus(state=TaskState.working)
        )
        yield f"data: {json.dumps(task)}

"
        
        # Stream artifact updates
        async for event in agent.stream_chat(input_text):
            artifact = Artifact(
                artifact_id=artifact_id,
                name="response",
                parts=[Part(root=TextPart(text=event.message))]
            )
            artifact_update = TaskArtifactUpdateEvent(
                kind="artifact-update",
                task_id=task_id,
                artifact=artifact,
                append=True
            )
            yield f"data: {json.dumps(artifact_update)}

"
        
        # Send completion
        final_status = TaskStatusUpdateEvent(
            task_id=task_id,
            status=TaskStatus(state=TaskState.completed),
            final=True
        )
        yield f"data: {json.dumps(final_status)}

"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Task States

Tasks progress through these states:
  • working - Task is processing
  • completed - Task finished successfully
  • failed - Task encountered an error

Error Handling

Errors are reported in the JSON-RPC error format (src/agentor/core/schema.py):
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32601,
    "message": "Method not found"
  }
}

Error Codes

class JSONRPCReturnCodes:
    METHOD_NOT_FOUND = -32601
    INVALID_PARAMS = -32602
    INTERNAL_ERROR = -32603

Custom Handlers

Register custom A2A handlers (src/agentor/core/agent.py:576):
def _register_a2a_handlers(self, controller: A2AController):
    controller.add_handler("message/stream", self._message_stream_handler)
    controller.add_handler("message/send", self._message_send_handler)
Implement your own handler:
from agentor.a2a import A2AController
from a2a.types import JSONRPCRequest, JSONRPCResponse

async def custom_handler(request: JSONRPCRequest) -> JSONRPCResponse:
    # Process request
    return JSONRPCResponse(
        id=request.id,
        result={"status": "success"}
    )

controller.add_handler("custom/method", custom_handler)

Agent Skills in A2A

Tools are automatically exposed as agent skills (src/agentor/core/agent.py:535):
skills = [
    AgentSkill(
        id=f"tool_{tool.name.lower().replace(' ', '_')}",
        name=tool.name,
        description=tool.description,
        tags=[]
    )
    for tool in self.tools
]

controller = A2AController(
    name=self.name,
    skills=skills,
    url=f"http://{host}:{port}"
)

Complete Server Example

From examples/agent-server/main.py:
from agentor import Agentor

agent = Agentor(
    name="Weather Agent",
    model="gpt-5-mini",
    tools=["get_weather"],
)

# Automatic A2A protocol support
agent.serve(port=8000)
Test the agent card:
curl http://localhost:8000/.well-known/agent-card.json
Send a message:
curl -X POST http://localhost:8000/ \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "message/stream",
    "params": {
      "message": {
        "parts": [{"kind": "text", "text": "Hello!"}]
      }
    }
  }'

Integration with FastAPI

The A2A controller is a FastAPI router (src/agentor/core/agent.py:559):
from fastapi import FastAPI

app = FastAPI()
app.include_router(controller)
This enables:
  • Automatic OpenAPI documentation
  • Dependency injection
  • Middleware support
  • Request validation

Protocol Versioning

The current implementation uses A2A protocol v0.3.0. The agent card includes:
{
  "version": "0.0.1",
  "signatures": []
}
Future versions may add cryptographic signatures for agent verification.

Next Steps

Last modified on March 4, 2026