Streaming enables real-time response delivery, providing immediate feedback and better user experience for long-running agent operations.
Why Stream?
- Immediate Feedback: Users see responses as they’re generated
- Better UX: Perceived performance improvement
- Tool Visibility: See tool calls and reasoning in real-time
- Lower Latency: First token appears faster
- Cancellation: Stop generation early if needed
Quick Start
Enable streaming with the stream_chat method:
import asyncio
from agentor import Agentor
agent = Agentor(
name="Assistant",
model="gpt-5-mini",
instructions="You are a helpful assistant."
)
async def main():
async for chunk in agent.stream_chat("Explain quantum computing"):
print(chunk, flush=True)
asyncio.run(main())
Stream Event Types
Agentor emits structured events during streaming:
import asyncio
from agentor import Agentor
agent = Agentor(
name="Assistant",
model="gpt-5-mini",
tools=["get_weather"]
)
async def main():
async for event in agent.stream_chat(
"What's the weather in London?",
serialize=False # Get AgentOutput objects instead of JSON strings
):
# Event types:
# - message: Text content
# - chunk: Text delta (partial message)
# - tool_action: Tool calls and outputs
# - reasoning: Chain-of-thought reasoning
if event.message:
print(f"Message: {event.message}")
if event.chunk:
print(event.chunk, end="", flush=True)
if event.tool_action:
print(f"\nTool: {event.tool_action.name} ({event.tool_action.type})")
if event.reasoning:
print(f"Thinking: {event.reasoning}")
asyncio.run(main())
JSON Serialization
Get events as JSON strings for easy transmission:
async def main():
async for json_event in agent.stream_chat(
"Write a haiku about Python",
serialize=True # Returns JSON strings (default)
):
print(json_event) # Each event is a JSON string
asyncio.run(main())
Example JSON event:
{
"type": "run_item_stream_event",
"message": "Code executes fast",
"chunk": null,
"tool_action": null,
"reasoning": null
}
HTTP Streaming
Serve streaming responses over HTTP:
from agentor import Agentor
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
agent = Agentor(name="Assistant", model="gpt-5-mini")
@app.post("/chat")
async def chat(message: str):
async def event_stream():
async for chunk in agent.stream_chat(message, serialize=True):
yield f"data: {chunk}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Built-in Server Streaming
Agentor’s built-in server supports streaming out of the box:
from agentor import Agentor
agent = Agentor(
name="Streaming Agent",
model="gpt-5-mini"
)
# Serve with automatic streaming support
agent.serve(port=8000)
Client request with streaming:
import requests
url = "http://localhost:8000/chat"
response = requests.post(
url,
json={"input": "Tell me a story", "stream": True},
stream=True
)
for line in response.iter_lines(decode_unicode=True):
if line:
print(line, flush=True)
A2A Protocol Streaming
Stream responses using the A2A protocol:
import requests
import json
url = "http://localhost:8000/"
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "message/stream",
"params": {
"message": {
"parts": [
{"kind": "text", "text": "Explain machine learning"}
]
}
}
}
response = requests.post(url, json=payload, stream=True)
for line in response.iter_lines(decode_unicode=True):
if line.startswith("data: "):
event = json.loads(line[6:])
result = event.get("result", {})
# Task creation
if "id" in result and "status" in result:
print(f"Task created: {result['id']}")
# Artifact updates (streaming content)
elif "artifact" in result:
artifact = result["artifact"]
if artifact.get("parts"):
text = artifact["parts"][0].get("text", "")
print(text, end="", flush=True)
# Status updates
elif "status" in result:
status = result["status"]
if result.get("final"):
print(f"\n\nCompleted with status: {status['state']}")
Advanced Streaming Patterns
Custom Event Filtering
Filter specific event types:
from agentor.output_text_formatter import format_stream_events
from agents import Runner
agent = Agentor(name="Assistant", model="gpt-5-mini")
async def stream_only_messages():
result = Runner.run_streamed(agent.agent, input="Hello")
# Only emit run_item_stream_event
async for event in format_stream_events(
result.stream_events(),
allowed_events=["run_item_stream_event"]
):
if event.message:
print(event.message)
asyncio.run(stream_only_messages())
Progress Tracking
Track completion progress:
import asyncio
async def stream_with_progress():
chunks = []
tool_calls = 0
async for event in agent.stream_chat(
"Research quantum computing and summarize",
serialize=False
):
if event.chunk:
chunks.append(event.chunk)
print(f"Progress: {len(chunks)} chunks", end="\r")
if event.tool_action and event.tool_action.type == "tool_called":
tool_calls += 1
print(f"\nTool call #{tool_calls}: {event.tool_action.name}")
if event.message:
print(f"\n{event.message}")
print(f"\nComplete! Total chunks: {len(chunks)}, Tool calls: {tool_calls}")
asyncio.run(stream_with_progress())
Buffered Streaming
Buffer and process events in batches:
import asyncio
from collections import deque
async def buffered_stream(buffer_size=5):
buffer = deque(maxlen=buffer_size)
async for event in agent.stream_chat("Write a story", serialize=False):
if event.chunk:
buffer.append(event.chunk)
# Process when buffer is full
if len(buffer) == buffer_size:
text = "".join(buffer)
print(text, end="", flush=True)
buffer.clear()
# Process remaining
if buffer:
print("".join(buffer), flush=True)
asyncio.run(buffered_stream())
Multi-Agent Streaming
Stream from multiple agents concurrently:
import asyncio
agent1 = Agentor(name="Agent 1", model="gpt-5-mini")
agent2 = Agentor(name="Agent 2", model="gpt-5-mini")
async def stream_multiple():
async def stream_agent(agent, prompt, prefix):
async for event in agent.stream_chat(prompt, serialize=False):
if event.message:
print(f"[{prefix}] {event.message}")
await asyncio.gather(
stream_agent(agent1, "What is Python?", "A1"),
stream_agent(agent2, "What is JavaScript?", "A2")
)
asyncio.run(stream_multiple())
WebSocket Streaming
For bidirectional streaming, use WebSockets:
from fastapi import FastAPI, WebSocket
from agentor import Agentor
import uvicorn
import json
app = FastAPI()
agent = Agentor(name="Assistant", model="gpt-5-mini")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive message
data = await websocket.receive_text()
message = json.loads(data)
# Stream response
async for event in agent.stream_chat(
message["text"],
serialize=True
):
await websocket.send_text(event)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await websocket.close()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
WebSocket client:
import asyncio
import websockets
import json
async def chat():
uri = "ws://localhost:8000/ws"
async with websockets.connect(uri) as websocket:
# Send message
await websocket.send(json.dumps({"text": "Hello!"}))
# Receive stream
async for message in websocket:
event = json.loads(message)
if event.get("message"):
print(event["message"])
asyncio.run(chat())
Error Handling
Handle streaming errors gracefully:
import asyncio
async def safe_stream():
try:
async for event in agent.stream_chat("Your query", serialize=False):
if event.message:
print(event.message)
except asyncio.TimeoutError:
print("Streaming timeout")
except Exception as e:
print(f"Streaming error: {e}")
finally:
print("Stream closed")
asyncio.run(safe_stream())
Best Practices
Streaming requires async execution:
# Good
async def main():
async for event in agent.stream_chat("Hello"):
print(event)
# Won't work
for event in agent.stream_chat("Hello"): # Error!
print(event)
Flush stdout for real-time display:
async for event in agent.stream_chat("Hello", serialize=False):
if event.chunk:
print(event.chunk, end="", flush=True) # flush=True is important
Chunks may not align with word boundaries:
buffer = ""
async for event in agent.stream_chat("Hello", serialize=False):
if event.chunk:
buffer += event.chunk
# Process complete words only
if " " in buffer:
words = buffer.split(" ")
for word in words[:-1]:
process_word(word)
buffer = words[-1]
import asyncio
async def stream_with_timeout():
try:
async with asyncio.timeout(30): # 30 second timeout
async for event in agent.stream_chat("Long task"):
print(event)
except asyncio.TimeoutError:
print("Stream timeout")
Monitor Connection Health
from fastapi.responses import StreamingResponse
@app.post("/chat")
async def chat(message: str):
async def event_stream():
try:
async for chunk in agent.stream_chat(message):
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"event: error\ndata: {str(e)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
- Use
serialize=True (default) when sending over network
- Use
serialize=False for local processing to avoid JSON overhead
- Buffer small chunks for better network efficiency
- Set appropriate timeouts based on expected response time
- Close streams properly to free resources
Next Steps
Last modified on March 4, 2026