Parallel AI Sessions: Run Multiple Agents
Table of content
One agent is slow. Two agents finish in half the time. Ten agents can process a batch while you grab coffee.
Parallel sessions multiply your throughput without multiplying your wait time. But they require different thinking than single-agent workflows.
Why Run Sessions in Parallel
Speed: A task that takes 10 sequential LLM calls can run in 1 round if the calls don’t depend on each other.
Cost efficiency: Same total tokens, but you get results faster. Your time has value.
Diverse outputs: Run the same prompt three times, get three perspectives. Pick the best or combine them.
Batch processing: Process 100 documents while you do other work. Return to finished results.
Two Core Patterns
Anthropic’s agent research identifies two parallelization approaches:
Sectioning: Split one task into independent subtasks. Each agent handles a piece. Combine outputs at the end.
Example: Analyze a codebase. One agent reviews security. Another checks performance. A third examines test coverage. Merge their reports.
Voting: Run identical prompts multiple times. Compare outputs. Use consensus or select the best.
Example: Generate three marketing headlines. Pick the one that resonates or A/B test all three.
Basic Implementation
The simplest parallel execution uses asyncio.gather:
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def analyze(document: str, aspect: str) -> dict:
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Analyze this document for {aspect}:\n\n{document}"
}]
)
return {"aspect": aspect, "analysis": response.content[0].text}
async def parallel_analysis(document: str):
aspects = ["clarity", "accuracy", "completeness"]
tasks = [analyze(document, aspect) for aspect in aspects]
results = await asyncio.gather(*tasks)
return {r["aspect"]: r["analysis"] for r in results}
# Run it
results = asyncio.run(parallel_analysis(doc))
Three API calls. One round-trip worth of latency.
Session Management
For complex workflows, track session state:
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import uuid
class SessionStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class AgentSession:
id: str
task: str
status: SessionStatus
result: Optional[str] = None
error: Optional[str] = None
class SessionManager:
def __init__(self):
self.sessions: dict[str, AgentSession] = {}
def create(self, task: str) -> str:
session_id = str(uuid.uuid4())[:8]
self.sessions[session_id] = AgentSession(
id=session_id,
task=task,
status=SessionStatus.PENDING
)
return session_id
def update(self, session_id: str, status: SessionStatus,
result: str = None, error: str = None):
session = self.sessions[session_id]
session.status = status
session.result = result
session.error = error
def all_complete(self) -> bool:
return all(
s.status in (SessionStatus.COMPLETE, SessionStatus.FAILED)
for s in self.sessions.values()
)
def get_results(self) -> dict:
return {
sid: s.result
for sid, s in self.sessions.items()
if s.status == SessionStatus.COMPLETE
}
Worker Pool Pattern
For batch processing, use a bounded worker pool to avoid rate limits:
import asyncio
from anthropic import AsyncAnthropic
async def process_batch(items: list[str], max_concurrent: int = 5):
client = AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def worker(item: str):
async with semaphore:
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {item}"}]
)
return response.content[0].text
tasks = [worker(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successes from failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
# Process 100 documents, 5 at a time
summaries, errors = asyncio.run(process_batch(documents, max_concurrent=5))
The semaphore prevents overwhelming the API. Adjust max_concurrent based on your rate limits.
Fan-Out/Fan-In Orchestration
Complex tasks need structured orchestration:
async def research_topic(topic: str):
# Fan out: gather information from multiple angles
research_tasks = [
research_aspect(topic, "historical background"),
research_aspect(topic, "current state"),
research_aspect(topic, "future trends"),
research_aspect(topic, "key players"),
]
research_results = await asyncio.gather(*research_tasks)
# Fan in: synthesize all research into final report
combined = "\n\n".join([
f"## {r['aspect']}\n{r['content']}"
for r in research_results
])
final_report = await synthesize(combined)
return final_report
async def research_aspect(topic: str, aspect: str) -> dict:
# Each aspect is researched independently
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Research {aspect} of {topic}. Be thorough but concise."
}]
)
return {"aspect": aspect, "content": response.content[0].text}
async def synthesize(research: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Synthesize this research into a cohesive report:\n\n{research}"
}]
)
return response.content[0].text
Four research calls run in parallel. One synthesis call waits for all four to complete.
Error Handling
Parallel execution means parallel failures. Handle them:
async def robust_parallel(tasks: list):
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
output.append({"success": False, "error": str(result)})
else:
output.append({"success": True, "data": result})
success_rate = sum(1 for r in output if r["success"]) / len(output)
print(f"Success rate: {success_rate:.0%}")
return output
Decide your failure tolerance. Some workflows can proceed with partial results. Others need all tasks to succeed.
Rate Limit Strategies
APIs have rate limits. Respect them:
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, requests_per_minute: int):
self.rpm = requests_per_minute
self.window = timedelta(minutes=1)
self.requests: list[datetime] = []
async def acquire(self):
now = datetime.now()
# Remove old requests outside window
self.requests = [r for r in self.requests if now - r < self.window]
if len(self.requests) >= self.rpm:
# Wait until oldest request exits window
wait_time = (self.requests[0] + self.window - now).total_seconds()
await asyncio.sleep(wait_time)
self.requests.append(datetime.now())
# Usage
limiter = RateLimiter(requests_per_minute=60)
async def rate_limited_call(prompt: str):
await limiter.acquire()
return await client.messages.create(...)
When Not to Parallelize
Parallel isn’t always better:
- Dependent tasks: If step 2 needs step 1’s output, you can’t parallelize them.
- Context sharing: Multiple agents don’t share conversation history. Each starts fresh.
- Cost concerns: Parallel doesn’t reduce tokens. If you’re optimizing cost over speed, sequential may work better with prompt chaining.
- Debugging: Sequential execution is easier to trace. Parallelize after the workflow works.
What You Can Steal
Quick win (10 minutes):
- Identify three independent subtasks in your current workflow
- Wrap them in
asyncio.gather - Measure the speedup
Production setup (2 hours):
- Implement SessionManager for state tracking
- Add semaphore-bounded worker pool
- Build error handling and retry logic
- Add rate limiting
Advanced orchestration:
- Design fan-out/fan-in pipelines
- Implement voting for quality-critical outputs
- Add progress tracking and logging
- Build dashboards for session monitoring
Start with the quick win. Most people discover a 3x speedup on their first try.
Related
- Small Bets covers the philosophy of parallel experimentation
- LLM Logging explains tracking what your parallel sessions actually do
Next: Token Efficiency
Get updates
New guides, workflows, and AI patterns. No spam.
Thank you! You're on the list.