rust-tokio-expert
Experienced Rust developer with expertise in building reliable network applications using the Tokio library and its associated stack
View on GitHubTable of content
Experienced Rust developer with expertise in building reliable network applications using the Tokio library and its associated stack
Installation
npx claude-plugins install @geoffjay/geoffjay-claude-plugins/rust-tokio-expert
Contents
Folders: agents, commands, skills
Included Skills
This plugin includes 4 skill definitions:
tokio-concurrency
Advanced concurrency patterns for Tokio including fan-out/fan-in, pipeline processing, rate limiting, and coordinated shutdown. Use when building high-concurrency async systems.
View skill definition
Tokio Concurrency Patterns
This skill provides advanced concurrency patterns for building scalable async applications with Tokio.
Fan-Out/Fan-In Pattern
Distribute work across multiple workers and collect results:
use futures::stream::{self, StreamExt};
pub async fn fan_out_fan_in<T, R>(
items: Vec<T>,
concurrency: usize,
process: impl Fn(T) -> Pin<Box<dyn Future<Output = R> + Send>> + Send + Sync + 'static,
) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
{
stream::iter(items)
.map(|item| process(item))
.buffer_unordered(concurrency)
.collect()
.await
}
// Usage
let results = fan_out_fan_in(
items,
10,
|item| Box::pin(async move { process_item(item).await })
).await;
Pipeline Processing
Chain async processing stages:
use tokio::sync::mpsc;
pub struct Pipeline<T> {
stages: Vec<Box<dyn Stage<T>>>,
}
#[async_trait::async_trait]
pub trait Stage<T>: Send {
async fn process(&self, item: T) -> T;
}
impl<T: Send + 'static> Pipeline<T> {
pub fn new() -> Self {
Self { stages: Vec::new() }
}
pub fn add_stage<S: Stage<T> + 'static>(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}
pub async fn run(self, mut input: mpsc::Receiver<T>) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Some(mut item) = input.recv().await {
...(truncated)
</details>
### tokio-networking
> Network programming patterns with Hyper, Tonic, and Tower. Use when building HTTP services, gRPC applications, implementing middleware, connection pooling, or health checks.
<details>
<summary>View skill definition</summary>
# Tokio Networking Patterns
This skill provides network programming patterns for building production-grade services with the Tokio ecosystem.
## HTTP Service with Hyper and Axum
Build HTTP services with routing and middleware:
```rust
use axum::{
Router,
routing::{get, post},
extract::{State, Path, Json},
response::IntoResponse,
middleware,
};
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
db: Arc<Database>,
cache: Arc<Cache>,
}
async fn create_app() -> Router {
let state = AppState {
db: Arc::new(Database::new().await),
cache: Arc::new(Cache::new()),
};
Router::new()
.route("/health", get(health_check))
.route("/api/v1/users", get(list_users).post(create_user))
.route("/api/v1/users/:id", get(get_user).delete(delete_user))
.layer(middleware::from_fn(logging_middleware))
.layer(middleware::from_fn(auth_middleware))
.with_state(state)
}
async fn health_check() -> impl IntoResponse {
"OK"
}
async fn get_user(
State(state): State<AppState>,
Path(id): Path<u64>,
) -> Result<Json<User>, StatusCode> {
state.db.get_user(id)
.await
.map(Json)
.ok_or(StatusCode::NOT_FOUND)
}
async fn logging_middleware<B>(
req: Request<B>,
next: Next<B>,
) -> impl IntoResponse {
let method = req.method().clone();
let uri = req.uri().clone();
let start = Instant::now();
let response = next.run(req).await;
let durat
...(truncated)
</details>
### tokio-patterns
> Common Tokio patterns and idioms for async programming. Use when implementing worker pools, request-response patterns, pub/sub, timeouts, retries, or graceful shutdown.
<details>
<summary>View skill definition</summary>
# Tokio Patterns
This skill provides common patterns and idioms for building robust async applications with Tokio.
## Worker Pool Pattern
Limit concurrent task execution using a semaphore:
```rust
use tokio::sync::Semaphore;
use std::sync::Arc;
pub struct WorkerPool {
semaphore: Arc<Semaphore>,
}
impl WorkerPool {
pub fn new(size: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(size)),
}
}
pub async fn execute<F, T>(&self, f: F) -> T
where
F: Future<Output = T>,
{
let _permit = self.semaphore.acquire().await.unwrap();
f.await
}
}
// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
(0..100).map(|i| pool.execute(process_item(i)))
).await;
Request-Response Pattern
Use oneshot channels for request-response communication:
use tokio::sync::{mpsc, oneshot};
pub enum Command {
Get { key: String, respond_to: oneshot::Sender<Option<String>> },
Set { key: String, value: String },
}
pub async fn actor(mut rx: mpsc::Receiver<Command>) {
let mut store = HashMap::new();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, respond_to } => {
let value = store.get(&key).cloned();
let _ = respond_to.send(value);
}
Command::Set { key, value } => {
store.insert(key, value);
}
}
}
}
// Client usag
...(truncated)
</details>
### tokio-troubleshooting
> Debugging and troubleshooting Tokio applications using tokio-console, detecting deadlocks, memory leaks, and performance issues. Use when diagnosing async runtime problems.
<details>
<summary>View skill definition</summary>
# Tokio Troubleshooting
This skill provides techniques for debugging and troubleshooting async applications built with Tokio.
## Using tokio-console for Runtime Inspection
Monitor async runtime in real-time:
```rust
// In Cargo.toml
[dependencies]
console-subscriber = "0.2"
// In main.rs
fn main() {
console_subscriber::init();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_application().await
});
}
Run console in separate terminal:
tokio-console
Key metrics to monitor:
- Task spawn rate and total tasks
- Poll duration per task
- Idle vs. busy time
- Waker operations
- Resource utilization
Identifying issues:
- Long poll durations: CPU-intensive work in async context
- Many wakers: Potential contention or inefficient polling
- Growing task count: Task leak or unbounded spawning
- High idle time: Not enough work or blocking operations
Debugging Deadlocks and Hangs
Detect and resolve deadlock situations:
Common Deadlock Pattern
// BAD: Potential deadlock
async fn deadlock_example() {
let mutex1 = Arc::new(Mutex::new(()));
let mutex2 = Arc::new(Mutex::new(()));
let m1 = mutex1.clone();
let m2 = mutex2.clone();
tokio::spawn(async move {
let _g1 = m1.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _g2 = m2.lock().await; // May deadlock
});
l
...(truncated)
</details>
## Source
[View on GitHub](https://github.com/geoffjay/claude-plugins)