rust-tokio-expert

Experienced Rust developer with expertise in building reliable network applications using the Tokio library and its associated stack

View on GitHub
Author Geoff Johnson
Namespace @geoffjay/geoffjay-claude-plugins
Category languages
Version 1.0.0
Stars 7
Downloads 12
self.md verified
Table of content

Experienced Rust developer with expertise in building reliable network applications using the Tokio library and its associated stack

Installation

npx claude-plugins install @geoffjay/geoffjay-claude-plugins/rust-tokio-expert

Contents

Folders: agents, commands, skills

Included Skills

This plugin includes 4 skill definitions:

tokio-concurrency

Advanced concurrency patterns for Tokio including fan-out/fan-in, pipeline processing, rate limiting, and coordinated shutdown. Use when building high-concurrency async systems.

View skill definition

Tokio Concurrency Patterns

This skill provides advanced concurrency patterns for building scalable async applications with Tokio.

Fan-Out/Fan-In Pattern

Distribute work across multiple workers and collect results:

use futures::stream::{self, StreamExt};

pub async fn fan_out_fan_in<T, R>(
    items: Vec<T>,
    concurrency: usize,
    process: impl Fn(T) -> Pin<Box<dyn Future<Output = R> + Send>> + Send + Sync + 'static,
) -> Vec<R>
where
    T: Send + 'static,
    R: Send + 'static,
{
    stream::iter(items)
        .map(|item| process(item))
        .buffer_unordered(concurrency)
        .collect()
        .await
}

// Usage
let results = fan_out_fan_in(
    items,
    10,
    |item| Box::pin(async move { process_item(item).await })
).await;

Pipeline Processing

Chain async processing stages:

use tokio::sync::mpsc;

pub struct Pipeline<T> {
    stages: Vec<Box<dyn Stage<T>>>,
}

#[async_trait::async_trait]
pub trait Stage<T>: Send {
    async fn process(&self, item: T) -> T;
}

impl<T: Send + 'static> Pipeline<T> {
    pub fn new() -> Self {
        Self { stages: Vec::new() }
    }

    pub fn add_stage<S: Stage<T> + 'static>(mut self, stage: S) -> Self {
        self.stages.push(Box::new(stage));
        self
    }

    pub async fn run(self, mut input: mpsc::Receiver<T>) -> mpsc::Receiver<T> {
        let (tx, rx) = mpsc::channel(100);

        tokio::spawn(async move {
            while let Some(mut item) = input.recv().await {
        

...(truncated)

</details>

### tokio-networking

> Network programming patterns with Hyper, Tonic, and Tower. Use when building HTTP services, gRPC applications, implementing middleware, connection pooling, or health checks.

<details>
<summary>View skill definition</summary>

# Tokio Networking Patterns

This skill provides network programming patterns for building production-grade services with the Tokio ecosystem.

## HTTP Service with Hyper and Axum

Build HTTP services with routing and middleware:

```rust
use axum::{
    Router,
    routing::{get, post},
    extract::{State, Path, Json},
    response::IntoResponse,
    middleware,
};
use std::sync::Arc;

#[derive(Clone)]
struct AppState {
    db: Arc<Database>,
    cache: Arc<Cache>,
}

async fn create_app() -> Router {
    let state = AppState {
        db: Arc::new(Database::new().await),
        cache: Arc::new(Cache::new()),
    };

    Router::new()
        .route("/health", get(health_check))
        .route("/api/v1/users", get(list_users).post(create_user))
        .route("/api/v1/users/:id", get(get_user).delete(delete_user))
        .layer(middleware::from_fn(logging_middleware))
        .layer(middleware::from_fn(auth_middleware))
        .with_state(state)
}

async fn health_check() -> impl IntoResponse {
    "OK"
}

async fn get_user(
    State(state): State<AppState>,
    Path(id): Path<u64>,
) -> Result<Json<User>, StatusCode> {
    state.db.get_user(id)
        .await
        .map(Json)
        .ok_or(StatusCode::NOT_FOUND)
}

async fn logging_middleware<B>(
    req: Request<B>,
    next: Next<B>,
) -> impl IntoResponse {
    let method = req.method().clone();
    let uri = req.uri().clone();

    let start = Instant::now();
    let response = next.run(req).await;
    let durat

...(truncated)

</details>

### tokio-patterns

> Common Tokio patterns and idioms for async programming. Use when implementing worker pools, request-response patterns, pub/sub, timeouts, retries, or graceful shutdown.

<details>
<summary>View skill definition</summary>

# Tokio Patterns

This skill provides common patterns and idioms for building robust async applications with Tokio.

## Worker Pool Pattern

Limit concurrent task execution using a semaphore:

```rust
use tokio::sync::Semaphore;
use std::sync::Arc;

pub struct WorkerPool {
    semaphore: Arc<Semaphore>,
}

impl WorkerPool {
    pub fn new(size: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(size)),
        }
    }

    pub async fn execute<F, T>(&self, f: F) -> T
    where
        F: Future<Output = T>,
    {
        let _permit = self.semaphore.acquire().await.unwrap();
        f.await
    }
}

// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
    (0..100).map(|i| pool.execute(process_item(i)))
).await;

Request-Response Pattern

Use oneshot channels for request-response communication:

use tokio::sync::{mpsc, oneshot};

pub enum Command {
    Get { key: String, respond_to: oneshot::Sender<Option<String>> },
    Set { key: String, value: String },
}

pub async fn actor(mut rx: mpsc::Receiver<Command>) {
    let mut store = HashMap::new();

    while let Some(cmd) = rx.recv().await {
        match cmd {
            Command::Get { key, respond_to } => {
                let value = store.get(&key).cloned();
                let _ = respond_to.send(value);
            }
            Command::Set { key, value } => {
                store.insert(key, value);
            }
        }
    }
}

// Client usag

...(truncated)

</details>

### tokio-troubleshooting

> Debugging and troubleshooting Tokio applications using tokio-console, detecting deadlocks, memory leaks, and performance issues. Use when diagnosing async runtime problems.

<details>
<summary>View skill definition</summary>

# Tokio Troubleshooting

This skill provides techniques for debugging and troubleshooting async applications built with Tokio.

## Using tokio-console for Runtime Inspection

Monitor async runtime in real-time:

```rust
// In Cargo.toml
[dependencies]
console-subscriber = "0.2"

// In main.rs
fn main() {
    console_subscriber::init();

    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            run_application().await
        });
}

Run console in separate terminal:

tokio-console

Key metrics to monitor:

Identifying issues:

Debugging Deadlocks and Hangs

Detect and resolve deadlock situations:

Common Deadlock Pattern

// BAD: Potential deadlock
async fn deadlock_example() {
    let mutex1 = Arc::new(Mutex::new(()));
    let mutex2 = Arc::new(Mutex::new(()));

    let m1 = mutex1.clone();
    let m2 = mutex2.clone();
    tokio::spawn(async move {
        let _g1 = m1.lock().await;
        tokio::time::sleep(Duration::from_millis(10)).await;
        let _g2 = m2.lock().await; // May deadlock
    });

    l

...(truncated)

</details>

## Source

[View on GitHub](https://github.com/geoffjay/claude-plugins)