Parallel AI Sessions: Run Multiple Agents

Table of content

One agent is slow. Two agents finish in half the time. Ten agents can process a batch while you grab coffee.

Parallel sessions multiply your throughput without multiplying your wait time. But they require different thinking than single-agent workflows.

Why Run Sessions in Parallel

Speed: A task that takes 10 sequential LLM calls can run in 1 round if the calls don’t depend on each other.

Cost efficiency: Same total tokens, but you get results faster. Your time has value.

Diverse outputs: Run the same prompt three times, get three perspectives. Pick the best or combine them.

Batch processing: Process 100 documents while you do other work. Return to finished results.

Two Core Patterns

Anthropic’s agent research identifies two parallelization approaches:

Sectioning: Split one task into independent subtasks. Each agent handles a piece. Combine outputs at the end.

Example: Analyze a codebase. One agent reviews security. Another checks performance. A third examines test coverage. Merge their reports.

Voting: Run identical prompts multiple times. Compare outputs. Use consensus or select the best.

Example: Generate three marketing headlines. Pick the one that resonates or A/B test all three.

Basic Implementation

The simplest parallel execution uses asyncio.gather:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def analyze(document: str, aspect: str) -> dict:
    response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Analyze this document for {aspect}:\n\n{document}"
        }]
    )
    return {"aspect": aspect, "analysis": response.content[0].text}

async def parallel_analysis(document: str):
    aspects = ["clarity", "accuracy", "completeness"]
    
    tasks = [analyze(document, aspect) for aspect in aspects]
    results = await asyncio.gather(*tasks)
    
    return {r["aspect"]: r["analysis"] for r in results}

# Run it
results = asyncio.run(parallel_analysis(doc))

Three API calls. One round-trip worth of latency.

Session Management

For complex workflows, track session state:

from dataclasses import dataclass
from enum import Enum
from typing import Optional
import uuid

class SessionStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETE = "complete"
    FAILED = "failed"

@dataclass
class AgentSession:
    id: str
    task: str
    status: SessionStatus
    result: Optional[str] = None
    error: Optional[str] = None

class SessionManager:
    def __init__(self):
        self.sessions: dict[str, AgentSession] = {}
    
    def create(self, task: str) -> str:
        session_id = str(uuid.uuid4())[:8]
        self.sessions[session_id] = AgentSession(
            id=session_id,
            task=task,
            status=SessionStatus.PENDING
        )
        return session_id
    
    def update(self, session_id: str, status: SessionStatus, 
               result: str = None, error: str = None):
        session = self.sessions[session_id]
        session.status = status
        session.result = result
        session.error = error
    
    def all_complete(self) -> bool:
        return all(
            s.status in (SessionStatus.COMPLETE, SessionStatus.FAILED)
            for s in self.sessions.values()
        )
    
    def get_results(self) -> dict:
        return {
            sid: s.result 
            for sid, s in self.sessions.items() 
            if s.status == SessionStatus.COMPLETE
        }

Worker Pool Pattern

For batch processing, use a bounded worker pool to avoid rate limits:

import asyncio
from anthropic import AsyncAnthropic

async def process_batch(items: list[str], max_concurrent: int = 5):
    client = AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []
    
    async def worker(item: str):
        async with semaphore:
            response = await client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=512,
                messages=[{"role": "user", "content": f"Summarize: {item}"}]
            )
            return response.content[0].text
    
    tasks = [worker(item) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Separate successes from failures
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    
    return successes, failures

# Process 100 documents, 5 at a time
summaries, errors = asyncio.run(process_batch(documents, max_concurrent=5))

The semaphore prevents overwhelming the API. Adjust max_concurrent based on your rate limits.

Fan-Out/Fan-In Orchestration

Complex tasks need structured orchestration:

async def research_topic(topic: str):
    # Fan out: gather information from multiple angles
    research_tasks = [
        research_aspect(topic, "historical background"),
        research_aspect(topic, "current state"),
        research_aspect(topic, "future trends"),
        research_aspect(topic, "key players"),
    ]
    research_results = await asyncio.gather(*research_tasks)
    
    # Fan in: synthesize all research into final report
    combined = "\n\n".join([
        f"## {r['aspect']}\n{r['content']}" 
        for r in research_results
    ])
    
    final_report = await synthesize(combined)
    return final_report

async def research_aspect(topic: str, aspect: str) -> dict:
    # Each aspect is researched independently
    response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{
            "role": "user", 
            "content": f"Research {aspect} of {topic}. Be thorough but concise."
        }]
    )
    return {"aspect": aspect, "content": response.content[0].text}

async def synthesize(research: str) -> str:
    response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"Synthesize this research into a cohesive report:\n\n{research}"
        }]
    )
    return response.content[0].text

Four research calls run in parallel. One synthesis call waits for all four to complete.

Error Handling

Parallel execution means parallel failures. Handle them:

async def robust_parallel(tasks: list):
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    output = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
            output.append({"success": False, "error": str(result)})
        else:
            output.append({"success": True, "data": result})
    
    success_rate = sum(1 for r in output if r["success"]) / len(output)
    print(f"Success rate: {success_rate:.0%}")
    
    return output

Decide your failure tolerance. Some workflows can proceed with partial results. Others need all tasks to succeed.

Rate Limit Strategies

APIs have rate limits. Respect them:

import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, requests_per_minute: int):
        self.rpm = requests_per_minute
        self.window = timedelta(minutes=1)
        self.requests: list[datetime] = []
    
    async def acquire(self):
        now = datetime.now()
        # Remove old requests outside window
        self.requests = [r for r in self.requests if now - r < self.window]
        
        if len(self.requests) >= self.rpm:
            # Wait until oldest request exits window
            wait_time = (self.requests[0] + self.window - now).total_seconds()
            await asyncio.sleep(wait_time)
        
        self.requests.append(datetime.now())

# Usage
limiter = RateLimiter(requests_per_minute=60)

async def rate_limited_call(prompt: str):
    await limiter.acquire()
    return await client.messages.create(...)

When Not to Parallelize

Parallel isn’t always better:

What You Can Steal

Quick win (10 minutes):

  1. Identify three independent subtasks in your current workflow
  2. Wrap them in asyncio.gather
  3. Measure the speedup

Production setup (2 hours):

  1. Implement SessionManager for state tracking
  2. Add semaphore-bounded worker pool
  3. Build error handling and retry logic
  4. Add rate limiting

Advanced orchestration:

  1. Design fan-out/fan-in pipelines
  2. Implement voting for quality-critical outputs
  3. Add progress tracking and logging
  4. Build dashboards for session monitoring

Start with the quick win. Most people discover a 3x speedup on their first try.

Next: Token Efficiency