Skip to main content
Streaming enables real-time response delivery, providing immediate feedback and better user experience for long-running agent operations.

Why Stream?

  • Immediate Feedback: Users see responses as they’re generated
  • Better UX: Perceived performance improvement
  • Tool Visibility: See tool calls and reasoning in real-time
  • Lower Latency: First token appears faster
  • Cancellation: Stop generation early if needed

Quick Start

Enable streaming with the stream_chat method:
import asyncio
from agentor import Agentor

agent = Agentor(
    name="Assistant",
    model="gpt-5-mini",
    instructions="You are a helpful assistant."
)

async def main():
    async for chunk in agent.stream_chat("Explain quantum computing"):
        print(chunk, flush=True)

asyncio.run(main())

Stream Event Types

Agentor emits structured events during streaming:
import asyncio
from agentor import Agentor

agent = Agentor(
    name="Assistant",
    model="gpt-5-mini",
    tools=["get_weather"]
)

async def main():
    async for event in agent.stream_chat(
        "What's the weather in London?",
        serialize=False  # Get AgentOutput objects instead of JSON strings
    ):
        # Event types:
        # - message: Text content
        # - chunk: Text delta (partial message)
        # - tool_action: Tool calls and outputs
        # - reasoning: Chain-of-thought reasoning
        
        if event.message:
            print(f"Message: {event.message}")
        
        if event.chunk:
            print(event.chunk, end="", flush=True)
        
        if event.tool_action:
            print(f"\nTool: {event.tool_action.name} ({event.tool_action.type})")
        
        if event.reasoning:
            print(f"Thinking: {event.reasoning}")

asyncio.run(main())

JSON Serialization

Get events as JSON strings for easy transmission:
async def main():
    async for json_event in agent.stream_chat(
        "Write a haiku about Python",
        serialize=True  # Returns JSON strings (default)
    ):
        print(json_event)  # Each event is a JSON string

asyncio.run(main())
Example JSON event:
{
  "type": "run_item_stream_event",
  "message": "Code executes fast",
  "chunk": null,
  "tool_action": null,
  "reasoning": null
}

HTTP Streaming

Serve streaming responses over HTTP:
from agentor import Agentor
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()
agent = Agentor(name="Assistant", model="gpt-5-mini")

@app.post("/chat")
async def chat(message: str):
    async def event_stream():
        async for chunk in agent.stream_chat(message, serialize=True):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Built-in Server Streaming

Agentor’s built-in server supports streaming out of the box:
from agentor import Agentor

agent = Agentor(
    name="Streaming Agent",
    model="gpt-5-mini"
)

# Serve with automatic streaming support
agent.serve(port=8000)
Client request with streaming:
import requests

url = "http://localhost:8000/chat"
response = requests.post(
    url,
    json={"input": "Tell me a story", "stream": True},
    stream=True
)

for line in response.iter_lines(decode_unicode=True):
    if line:
        print(line, flush=True)

A2A Protocol Streaming

Stream responses using the A2A protocol:
import requests
import json

url = "http://localhost:8000/"
payload = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "message/stream",
    "params": {
        "message": {
            "parts": [
                {"kind": "text", "text": "Explain machine learning"}
            ]
        }
    }
}

response = requests.post(url, json=payload, stream=True)

for line in response.iter_lines(decode_unicode=True):
    if line.startswith("data: "):
        event = json.loads(line[6:])
        result = event.get("result", {})
        
        # Task creation
        if "id" in result and "status" in result:
            print(f"Task created: {result['id']}")
        
        # Artifact updates (streaming content)
        elif "artifact" in result:
            artifact = result["artifact"]
            if artifact.get("parts"):
                text = artifact["parts"][0].get("text", "")
                print(text, end="", flush=True)
        
        # Status updates
        elif "status" in result:
            status = result["status"]
            if result.get("final"):
                print(f"\n\nCompleted with status: {status['state']}")

Advanced Streaming Patterns

Custom Event Filtering

Filter specific event types:
from agentor.output_text_formatter import format_stream_events
from agents import Runner

agent = Agentor(name="Assistant", model="gpt-5-mini")

async def stream_only_messages():
    result = Runner.run_streamed(agent.agent, input="Hello")
    
    # Only emit run_item_stream_event
    async for event in format_stream_events(
        result.stream_events(),
        allowed_events=["run_item_stream_event"]
    ):
        if event.message:
            print(event.message)

asyncio.run(stream_only_messages())

Progress Tracking

Track completion progress:
import asyncio

async def stream_with_progress():
    chunks = []
    tool_calls = 0
    
    async for event in agent.stream_chat(
        "Research quantum computing and summarize",
        serialize=False
    ):
        if event.chunk:
            chunks.append(event.chunk)
            print(f"Progress: {len(chunks)} chunks", end="\r")
        
        if event.tool_action and event.tool_action.type == "tool_called":
            tool_calls += 1
            print(f"\nTool call #{tool_calls}: {event.tool_action.name}")
        
        if event.message:
            print(f"\n{event.message}")
    
    print(f"\nComplete! Total chunks: {len(chunks)}, Tool calls: {tool_calls}")

asyncio.run(stream_with_progress())

Buffered Streaming

Buffer and process events in batches:
import asyncio
from collections import deque

async def buffered_stream(buffer_size=5):
    buffer = deque(maxlen=buffer_size)
    
    async for event in agent.stream_chat("Write a story", serialize=False):
        if event.chunk:
            buffer.append(event.chunk)
            
            # Process when buffer is full
            if len(buffer) == buffer_size:
                text = "".join(buffer)
                print(text, end="", flush=True)
                buffer.clear()
    
    # Process remaining
    if buffer:
        print("".join(buffer), flush=True)

asyncio.run(buffered_stream())

Multi-Agent Streaming

Stream from multiple agents concurrently:
import asyncio

agent1 = Agentor(name="Agent 1", model="gpt-5-mini")
agent2 = Agentor(name="Agent 2", model="gpt-5-mini")

async def stream_multiple():
    async def stream_agent(agent, prompt, prefix):
        async for event in agent.stream_chat(prompt, serialize=False):
            if event.message:
                print(f"[{prefix}] {event.message}")
    
    await asyncio.gather(
        stream_agent(agent1, "What is Python?", "A1"),
        stream_agent(agent2, "What is JavaScript?", "A2")
    )

asyncio.run(stream_multiple())

WebSocket Streaming

For bidirectional streaming, use WebSockets:
from fastapi import FastAPI, WebSocket
from agentor import Agentor
import uvicorn
import json

app = FastAPI()
agent = Agentor(name="Assistant", model="gpt-5-mini")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # Receive message
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # Stream response
            async for event in agent.stream_chat(
                message["text"],
                serialize=True
            ):
                await websocket.send_text(event)
    
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
WebSocket client:
import asyncio
import websockets
import json

async def chat():
    uri = "ws://localhost:8000/ws"
    async with websockets.connect(uri) as websocket:
        # Send message
        await websocket.send(json.dumps({"text": "Hello!"}))
        
        # Receive stream
        async for message in websocket:
            event = json.loads(message)
            if event.get("message"):
                print(event["message"])

asyncio.run(chat())

Error Handling

Handle streaming errors gracefully:
import asyncio

async def safe_stream():
    try:
        async for event in agent.stream_chat("Your query", serialize=False):
            if event.message:
                print(event.message)
    
    except asyncio.TimeoutError:
        print("Streaming timeout")
    except Exception as e:
        print(f"Streaming error: {e}")
    finally:
        print("Stream closed")

asyncio.run(safe_stream())

Best Practices

1
Use Async/Await
2
Streaming requires async execution:
3
# Good
async def main():
    async for event in agent.stream_chat("Hello"):
        print(event)

# Won't work
for event in agent.stream_chat("Hello"):  # Error!
    print(event)
4
Flush Output
5
Flush stdout for real-time display:
6
async for event in agent.stream_chat("Hello", serialize=False):
    if event.chunk:
        print(event.chunk, end="", flush=True)  # flush=True is important
7
Handle Partial Content
8
Chunks may not align with word boundaries:
9
buffer = ""
async for event in agent.stream_chat("Hello", serialize=False):
    if event.chunk:
        buffer += event.chunk
        # Process complete words only
        if " " in buffer:
            words = buffer.split(" ")
            for word in words[:-1]:
                process_word(word)
            buffer = words[-1]
10
Set Timeouts
11
Prevent hanging streams:
12
import asyncio

async def stream_with_timeout():
    try:
        async with asyncio.timeout(30):  # 30 second timeout
            async for event in agent.stream_chat("Long task"):
                print(event)
    except asyncio.TimeoutError:
        print("Stream timeout")
13
Monitor Connection Health
14
For HTTP streaming:
15
from fastapi.responses import StreamingResponse

@app.post("/chat")
async def chat(message: str):
    async def event_stream():
        try:
            async for chunk in agent.stream_chat(message):
                yield f"data: {chunk}\n\n"
        except Exception as e:
            yield f"event: error\ndata: {str(e)}\n\n"
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream"
    )

Performance Tips

  • Use serialize=True (default) when sending over network
  • Use serialize=False for local processing to avoid JSON overhead
  • Buffer small chunks for better network efficiency
  • Set appropriate timeouts based on expected response time
  • Close streams properly to free resources

Next Steps

Last modified on March 4, 2026