Skip to content

atopx/pulses

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Pulses

English | 简体中文

A robust, high-performance background job processing library for Rust.

Pulses consumes messages from a pluggable broker (a Redis Streams backend ships behind the default redis feature), routes each message to the handlers that subscribe to its stream, and runs those handlers concurrently on the multi-threaded Tokio runtime.

Features

  • Pluggable brokers — implement the [Broker] trait for any backend; a Redis Streams implementation is included.
  • Type-safe handlers — register handlers at compile time. Their streams are unioned automatically into the broker subscription, so there is no stream list to keep in sync by hand.
  • Real delivery semanticsOutcome::Retry { after_ms } honors its delay and is bounded by handler_max_attempts; messages that exhaust their retries or return Outcome::DeadLetter are written to a durable <stream>-dlq stream before being acknowledged.
  • Bounded concurrency & back-pressure — each handler processes up to a configurable number of in-flight messages; a saturated handler applies back-pressure to the reader.
  • Reliability — failed acknowledgements are retried with backoff, and pending messages abandoned by a crashed consumer are reclaimed via XAUTOCLAIM.
  • Graceful shutdown — on cancellation the runtime stops accepting new work and drains in-flight handler invocations before returning.

Installation

[dependencies]
pulses = "0.2"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"

The redis feature is enabled by default. Disable default features to depend on only the core runtime and supply your own broker:

pulses = { version = "0.2", default-features = false }

Usage

1. Define a handler

use pulses::{Context, Envelope, Handler, HandlerError, Outcome};
use pulses::broker::redis::RedisBroker;

struct EmailHandler;

impl Handler<RedisBroker> for EmailHandler {
    // Streams this handler consumes — wired into the subscription automatically.
    const STREAMS: &'static [&'static str] = &["email-tasks"];
    // Maximum messages processed concurrently by this handler.
    const MAX_IN_FLIGHT: usize = 8;

    async fn handle(&self, _ctx: &Context<RedisBroker>, msg: &Envelope) -> Result<Outcome, HandlerError> {
        let payload = String::from_utf8_lossy(&msg.payload);
        println!("sending email {}: {payload}", msg.id);
        Ok(Outcome::Ack)
    }
}

A handler can return:

Outcome Behavior
Ack Acknowledge and remove the message.
Drop Discard without processing (acknowledged).
Retry { after_ms } Re-run after the delay, up to handler_max_attempts, then dead-letter.
DeadLetter { reason } Write to <stream>-dlq, then acknowledge.

A handler that returns Err(..) is treated as a Retry.

2. Run the application

use pulses::App;
use pulses::broker::redis::{RedisBroker, RedisSubscription};
use tokio_util::sync::CancellationToken;
# use pulses::{Context, Envelope, Handler, HandlerError, Outcome};
# struct EmailHandler;
# impl Handler<RedisBroker> for EmailHandler {
#     const STREAMS: &'static [&'static str] = &["email-tasks"];
#     async fn handle(&self, _c: &Context<RedisBroker>, _m: &Envelope) -> Result<Outcome, HandlerError> { Ok(Outcome::Ack) }
# }

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let broker = RedisBroker::connect("redis://127.0.0.1:6379").await?;

    let app = App::new(broker, RedisSubscription::for_group("workers", "node-1"))
        .register(EmailHandler);

    // Trigger the token on SIGINT/SIGTERM for a graceful shutdown.
    let cancellation = CancellationToken::new();
    let shutdown = cancellation.clone();
    tokio::spawn(async move {
        let _ = tokio::signal::ctrl_c().await;
        shutdown.cancel();
    });

    app.run(cancellation).await?;
    Ok(())
}

See examples/ for a runnable producer/consumer pair (redis_producer, redis_consumer) and a multi-handler worker (multi_task).

Configuration

Tune the runtime with [AppConfig] via App::with_config:

Field Default Purpose
handler_queue_capacity 1024 Per-handler mailbox size (back-pressure point).
max_in_flight_per_handler 256 Concurrency ceiling per handler.
ack_retry_queue_capacity 1024 Capacity of the failed-ack retry queue.
ack_max_attempts 8 Max attempts to re-acknowledge a message.
handler_max_attempts 5 Max handler attempts before dead-lettering.
reclaim_interval 30s How often to reclaim abandoned messages.
poll_idle_sleep 50ms Idle sleep guarding against non-blocking poll loops.
backoff Exponential backoff with jitter for retries.

Architecture

架构图

  • App — builder and supervisor; wires everything together and owns shutdown.
  • Broker — trait abstracting the message backend (Redis Streams included).
  • Handler — user logic; declares the streams it consumes.
  • Router — forwards each message to the pools subscribing to its stream.
  • HandlerPool — runs one handler with bounded concurrency, honoring retries and dead-lettering.
  • AckRetrier / Reclaimer — reliability actors for acks and abandoned messages.

Testing

cargo test                     # unit + doc tests (no broker required)
REDIS_URL=redis://127.0.0.1:6379 cargo test   # also runs Redis integration tests

Integration tests skip themselves when REDIS_URL is unset.

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages