Table of contents

  1. Overview
  2. Quick Start
  3. Why Workers (vs Plain Goroutines)
  4. Creating Workers
  5. Handler Return Values
    1. ErrDoNotRestart
  6. Builder Methods
  7. Jitter
    1. Per-worker jitter
    2. Run-level default
    3. Formula
    4. Initial delay
  8. Middleware
    1. Types
    2. Worker-level middleware
    3. Run-level middleware
    4. Writing custom middleware
  9. Built-in Middleware
    1. Recover
    2. Tracing
    3. Duration
    4. DistributedLock
    5. Timeout
    6. Slog
    7. LogContext
    8. DefaultInterceptors
  10. WorkerInfo
  11. Helpers
    1. EveryInterval
    2. ChannelWorker
    3. BatchChannelWorker
  12. Dynamic Workers
    1. Example: Fixed children on startup
    2. Example: Per-tenant workers
    3. Example: Nested hierarchy
  13. Running Workers
    1. Multiple workers
    2. Single worker
  14. Graceful Shutdown
  15. Logging
  16. Metrics
    1. Built-in Prometheus metrics
    2. No metrics (default)
    3. Custom metrics
    4. Per-worker override
  17. Testing
    1. Testing middleware
    2. Testing with dynamic children
    3. Integration testing
  18. Best Practices
  19. ColdBrew Integration

Overview

workers is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown. It is built on top of suture, an Erlang-inspired supervisor tree library for Go.

Every worker runs inside its own supervisor subtree:

Root Supervisor
├── Worker-A supervisor
│   ├── Worker-A service (middleware → handler)
│   ├── Child-A1 (added dynamically)
│   └── Child-A2
└── Worker-B supervisor
    └── Worker-B service (middleware → handler)

Key properties:

  • Scoped lifecycle — when a parent stops, all its children stop automatically. No manual cleanup or sync.WaitGroup needed.
  • Restart by default — workers restart with exponential backoff on failure. One-shot workers opt out with WithRestart(false) or return ErrDoNotRestart.
  • Two-layer panic recovery — suture catches panics at the supervisor level (restarts the worker). middleware.Recover catches panics per-cycle (converts to error without a full restart). Use both for defense in depth.
  • Composable middleware — tracing, structured logging, distributed locking, per-cycle timeout, and duration metrics as gRPC-style interceptors. Write your own with a single function.
  • Jitter — desynchronize periodic workers to prevent thundering herd. Per-worker or run-level default.
  • Dynamic children — workers can spawn and remove child workers at runtime via Add/Remove. Children inherit middleware, metrics, and scoped lifecycle.
  • Pluggable metrics — Prometheus out of the box, or implement the Metrics interface for Datadog, StatsD, etc. Per-attempt lifetime and per-cycle duration tracked separately.
  • Handler cleanupCycleHandler.Close() is called exactly once when the worker permanently stops, for resource cleanup (DB connections, leases).

Quick Start

package main

import (
    "context"
    "log/slog"
    "os"
    "os/signal"
    "time"

    "github.com/go-coldbrew/workers"
    "github.com/go-coldbrew/workers/middleware"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    err := workers.Run(ctx, []*workers.Worker{
        // Long-running worker — blocks until ctx is cancelled
        workers.NewWorker("kafka").HandlerFunc(consume),

        // Periodic worker — runs cleanup every 5 minutes with jitter
        workers.NewWorker("cleanup").HandlerFunc(cleanup).
            Every(5 * time.Minute).WithJitter(10),
    },
        // Standard observability: panic recovery, log context, tracing, structured logging
        workers.WithInterceptors(middleware.DefaultInterceptors()...),
    )
    if err != nil {
        slog.Error("workers failed", "error", err)
    }
}

// consume and cleanup have signature:
//   func(ctx context.Context, info *workers.WorkerInfo) error

Run blocks until ctx is cancelled and all workers have exited.

Why Workers (vs Plain Goroutines)

With plain goroutines, you manage lifecycle manually:

// Before: manual goroutine management
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    // no panic recovery — crashes the process
    // no restart — dies permanently on error
    // no structured shutdown — must coordinate ctx + wg manually
    // no distributed locking — runs on every pod
    consume(ctx)
}()
wg.Wait()

With workers, the framework handles all of that:

// After: workers handle lifecycle
workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka").HandlerFunc(consume).
        Interceptors(
            middleware.Recover(onPanic),
            middleware.Tracing(),
            middleware.DistributedLock(redisLocker),
        ),
})

What you get for free: panic recovery, configurable restart with exponential backoff, scoped lifecycle (children stop when parents stop), composable middleware (tracing, logging, distributed locking, per-cycle timeout), jitter for periodic workers, and pluggable metrics. Distributed locking ensures only one instance runs a job across pods — no manual coordination.

Creating Workers

Use NewWorker with a name, then set a handler via HandlerFunc (for plain functions) or Handler (for structs with cleanup):

// Uses github.com/go-coldbrew/log for structured logging
w := workers.NewWorker("my-worker").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    log.Info(ctx, "msg", "started", "worker", info.GetName(), "attempt", info.GetAttempt())
    <-ctx.Done()
    return ctx.Err()
})

For handlers that need resource cleanup, implement the CycleHandler interface. Close() is called exactly once when the worker permanently stops:

type batchProcessor struct {
    db   *sql.DB
    conn *sql.Conn // dedicated connection for this worker
}

func NewBatchProcessor(db *sql.DB) (*batchProcessor, error) {
    conn, err := db.Conn(context.Background())
    if err != nil {
        return nil, err
    }
    return &batchProcessor{db: db, conn: conn}, nil
}

func (b *batchProcessor) RunCycle(ctx context.Context, info *workers.WorkerInfo) error {
    rows, err := b.conn.QueryContext(ctx,
        "SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 100")
    if err != nil {
        return err
    }
    defer rows.Close()
    return processBatch(ctx, rows)
}

func (b *batchProcessor) Close() error {
    // Called once on permanent stop — release the dedicated connection
    return b.conn.Close()
}

The handler receives a context.Context for cancellation and a *WorkerInfo for worker metadata.

Handler Return Values

Return value Long-running worker (no Every) Periodic worker (with Every)
return nil Worker stops permanently Cycle succeeded — next tick fires
return error Restarts with backoff (if restart enabled) Restarts with backoff (if restart enabled)
return ctx.Err() Clean shutdown Clean shutdown
return workers.ErrDoNotRestart Permanent stop Permanent stop

Long-running workers should block on <-ctx.Done(), then return ctx.Err(). Returning nil without waiting for ctx cancellation stops the worker permanently.

Periodic workers run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The Every wrapper manages the tick loop — your handler just processes one cycle.

ErrDoNotRestart

Return workers.ErrDoNotRestart from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. ChannelWorker and BatchChannelWorker return this automatically when their channel is closed.

func processQueue(ctx context.Context, info *workers.WorkerInfo) error {
    item, ok := queue.Dequeue(ctx)
    if !ok {
        return workers.ErrDoNotRestart // queue exhausted
    }
    return process(ctx, item)
}

Builder Methods

Method Description Default
HandlerFunc(fn) Set handler from a plain function
Handler(h) Set handler from a CycleHandler struct
WithRestart(false) Disable restart (one-shot worker) true (restart with backoff)
Every(duration) Run periodically on a fixed interval
WithJitter(percent) Randomize tick interval by ±percent (requires Every) inherit run-level
WithInitialDelay(d) Delay first tick (requires Every)
Interceptors(mw...) Replace worker-level middleware
AddInterceptors(mw...) Append to worker-level middleware
WithFailureBackoff(d) Duration between restarts 15s (suture default)
WithFailureThreshold(n float64) Max failures before supervisor gives up 5.0 (suture default)
WithFailureDecay(rate float64) Rate at which failure count decays (per second) 1.0 (suture default)
WithBackoffJitter(j) Random jitter on restart backoff none
WithTimeout(d) Max time to wait for graceful stop 10s (suture default)
WithMetrics(m Metrics) Per-worker metrics override inherit from parent/run

Example with full configuration:

workers.NewWorker("resilient-consumer").HandlerFunc(consume).
    Every(15 * time.Second).
    WithJitter(10).
    WithInitialDelay(5 * time.Second).
    Interceptors(
        middleware.Recover(onPanic),
        middleware.Tracing(),
    ).
    WithFailureBackoff(5 * time.Second).
    WithFailureThreshold(10).
    WithTimeout(30 * time.Second)

Jitter

When many workers share the same base interval (e.g. 15s), they synchronize and spike downstream services — the thundering herd problem. Jitter desynchronizes ticks by randomizing each interval within a configurable range.

Per-worker jitter

workers.NewWorker("poller").HandlerFunc(poll).
    Every(15 * time.Second).
    WithJitter(10) // each tick is within [13.5s, 16.5s)

Run-level default

Apply jitter to all periodic workers with WithDefaultJitter:

workers.Run(ctx, myWorkers, workers.WithDefaultJitter(10))

Worker-level WithJitter takes precedence over the run-level default. Setting WithJitter(0) explicitly disables jitter for a specific worker even when a run-level default is set.

Formula

On each tick:

spread   = base × percent ÷ 100
jittered = base − spread + rand(2 × spread)

The effective interval is clamped to a minimum of 1ms (never zero or negative). Each tick recomputes independently — successive intervals differ.

Initial delay

WithInitialDelay delays the first tick, preventing N workers from all firing at t=0 on process start:

workers.NewWorker("poller").HandlerFunc(poll).
    Every(15 * time.Second).
    WithJitter(10).
    WithInitialDelay(5 * time.Second)

Middleware

Middleware wraps each worker execution cycle with cross-cutting concerns like panic recovery, tracing, distributed locking, and timing. For periodic workers (Every), middleware runs on every tick, not once for the worker lifetime.

Types

// CycleHandler handles worker execution cycles.
// Implement as a struct for handlers that need cleanup.
type CycleHandler interface {
    RunCycle(ctx context.Context, info *WorkerInfo) error
    Close() error  // called once when the worker stops
}

// CycleFunc adapts a plain function into a CycleHandler.
// Close is a no-op — use this for simple, stateless handlers.
type CycleFunc func(ctx context.Context, info *WorkerInfo) error

// Middleware intercepts each execution cycle.
// Call next to continue the chain. Matches gRPC interceptor convention.
type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) error

Worker-level middleware

w := workers.NewWorker("solver").HandlerFunc(solve).
    Every(15 * time.Second).
    Interceptors(
        middleware.Recover(onPanic),
        middleware.Tracing(),
        middleware.Duration(observeDuration),
    )

The first middleware in the list is the outermost wrapper (runs first on entry, last on exit), matching the gRPC interceptor convention.

Run-level middleware

WithInterceptors replaces and AddInterceptors appends to the run-level middleware list. These are run options that apply to all workers in the Run call — distinct from the worker-level (*Worker).Interceptors and (*Worker).AddInterceptors which only affect a single worker. Run-level middleware wraps outside worker-level middleware, so shared concerns like tracing are always outermost:

workers.Run(ctx, myWorkers,
    workers.WithInterceptors(middleware.DefaultInterceptors()...),
    workers.AddInterceptors(middleware.Duration(observe)),
)

Effective chain: run-level middleware → worker-level middleware → handler

Writing custom middleware

Middleware is a flat function that calls next to continue the chain. The *WorkerInfo parameter gives you the worker name and attempt explicitly — no hidden context lookups:

// Uses github.com/go-coldbrew/log for structured logging
func myLogging(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
    log.Info(ctx, "msg", "cycle start", "worker", info.GetName())
    err := next(ctx, info)
    log.Info(ctx, "msg", "cycle end", "worker", info.GetName(), "error", err)
    return err
}

// Attach it
w.Interceptors(myLogging)

Same shape as gRPC interceptors — familiar to the target audience:

// gRPC:   func(ctx, req, info, handler) (resp, error)
// Workers: func(ctx, info, next) error

Built-in Middleware

The middleware sub-package ships optional middleware. None are applied by default.

import "github.com/go-coldbrew/workers/middleware"
Middleware Description
Recover(onPanic) Catches panics, calls callback, returns error
Tracing() Creates an OTEL span per cycle via go-coldbrew/tracing
Duration(observe) Measures wall-clock time of each cycle
DistributedLock(locker, opts...) Acquires a distributed lock before each cycle
Timeout(d) Enforces a per-cycle deadline
Slog() Structured log lines per cycle (start + end/error) via go-coldbrew/log
LogContext() Injects worker name + attempt into log context
DefaultInterceptors() Returns [Recover, LogContext, Tracing, Slog]

Recover

Catches panics in the worker cycle and converts them to errors. The panic does not propagate:

middleware.Recover(func(name string, v any) {
    alerting.Send(fmt.Sprintf("worker %s panicked: %v", name, v))
})

Tracing

Creates an OTEL span named worker:<name>:cycle for each tick. Records errors on the span:

middleware.Tracing()

Duration

Measures wall-clock time of each cycle and calls a callback. This is per-cycle timing — distinct from the per-attempt lifetime captured by Metrics.ObserveRunDuration (the worker_run_duration_seconds Prometheus histogram).

middleware.Duration(func(name string, d time.Duration) {
    metrics.RecordCycleDuration(name, d)
})

DistributedLock

Acquires a distributed lock before each cycle. If the lock is held by another instance, the cycle is skipped:

middleware.DistributedLock(redisLocker,
    middleware.WithKeyFunc(func(name string) string {
        return "myapp:lock:" + name
    }),
    middleware.WithTTLFunc(func(_ string) time.Duration {
        return time.Minute
    }),
    middleware.WithOnNotAcquired(func(ctx context.Context, name string) error {
        log.Info(ctx, "msg", "lock held, skipping", "worker", name) // go-coldbrew/log
        return nil
    }),
)

The Locker interface:

type Locker interface {
    Acquire(ctx context.Context, key string, ttl time.Duration) (bool, error)
    Release(ctx context.Context, key string) error
}

Release uses context.WithoutCancel so that context cancellation does not prevent lock cleanup.

Timeout

Enforces a per-cycle deadline. Distinct from WithTimeout (which controls graceful shutdown):

middleware.Timeout(30 * time.Second)

Slog

Structured log lines per cycle (start + end/error) via go-coldbrew/log. Pair with LogContext() to include worker name and attempt automatically:

middleware.Slog()

LogContext

Injects worker name and attempt into the log context so all log calls inside the worker automatically include them:

middleware.LogContext()

DefaultInterceptors

Convenience bundle for the standard observability stack:

// Zero-config observability — one line
workers.Run(ctx, myWorkers,
    workers.WithInterceptors(middleware.DefaultInterceptors()...),
)

// Defaults + extras
workers.Run(ctx, myWorkers,
    workers.WithInterceptors(middleware.DefaultInterceptors()...),
    workers.AddInterceptors(middleware.Duration(observe)),
)

WorkerInfo

Every handler receives a *WorkerInfo that carries worker metadata and child management:

Method Description
GetName() string Worker name
GetAttempt() int Restart attempt (0 on first run)
Add(w *Worker) bool Add child worker — returns false if name already exists (no-op)
Remove(name string) Stop child worker by name
GetChildren() []string Names of running child workers
GetChild(name string) (Worker, bool) Look up a child by name (returns a value copy)

Use Worker.GetName() and Worker.GetHandler() to inspect a child.

To replace a running worker, call Remove then Add. This is not atomic — there is a brief window where the worker is not running.

context.Context handles cancellation/deadlines/values. *WorkerInfo handles everything worker-specific.

Helpers

EveryInterval

Use the Every builder method to run a handler periodically:

workers.NewWorker("metrics-reporter").HandlerFunc(reportMetrics).
    Every(30 * time.Second)

For manual control, EveryInterval wraps a handler in a timer loop directly:

workers.NewWorker("metrics-reporter").HandlerFunc(workers.EveryInterval(
    30*time.Second, reportMetrics,
))

Both are equivalent. The builder form is preferred — it also supports WithJitter and WithInitialDelay.

ChannelWorker

Consumes items from a channel one at a time:

refreshChan := make(chan string, 100)

workers.NewWorker("refresher").HandlerFunc(workers.ChannelWorker(refreshChan,
    func(ctx context.Context, info *workers.WorkerInfo, driverID string) error {
        return refreshDriverProfile(ctx, driverID)
    },
))

BatchChannelWorker

Collects items into batches, flushing when the batch reaches maxSize or maxDelay elapses since the first item:

eventChan := make(chan Event, 1000)

workers.NewWorker("event-batcher").HandlerFunc(workers.BatchChannelWorker(eventChan,
    100,                    // max batch size
    500*time.Millisecond,   // max delay
    func(ctx context.Context, info *workers.WorkerInfo, batch []Event) error {
        return store.BulkInsert(ctx, batch)
    },
))

Partial batches are flushed on context cancellation (graceful shutdown). Both ChannelWorker and BatchChannelWorker return ErrDoNotRestart when the channel is closed, preventing restart loops on exhausted channels.

Dynamic Workers

Workers can dynamically spawn and remove child workers using WorkerInfo.Add, Remove, and GetChildren. This is the pattern for config-driven worker pools (like database-driven solver workers):

workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    ticker := time.NewTicker(60 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return ctx.Err() // children stop automatically
        case <-ticker.C:
            desired := loadConfigsFromDB(ctx)

            running := map[string]bool{}
            for _, name := range info.GetChildren() {
                running[name] = true
            }

            // Remove workers no longer in config
            for name := range running {
                if _, ok := desired[name]; !ok {
                    info.Remove(name)
                }
            }

            // Add only workers that aren't already running
            for name, cfg := range desired {
                if !running[name] {
                    info.Add(workers.NewWorker(name).HandlerFunc(makeSolver(cfg)))
                }
            }
        }
    }
})

Add is a no-op if the name exists — it returns false without restarting the running worker. To replace a worker (e.g., on config change), call Remove then Add:

info.Remove("solver")
info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg)))

Note: Remove + Add is not atomic — there is a brief window where the worker is not running.

Example: Fixed children on startup

A worker that spawns N consumer goroutines when it starts:

workers.NewWorker("consumer-pool").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    for i := range 5 {
        name := fmt.Sprintf("consumer-%d", i)
        info.Add(workers.NewWorker(name).HandlerFunc(workers.ChannelWorker(eventChan, processEvent)))
    }
    <-ctx.Done()
    return ctx.Err() // all 5 consumers stop with parent
})

Example: Per-tenant workers

Spawn a dedicated worker when a new tenant appears, remove it when the tenant is deactivated:

workers.NewWorker("tenant-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case event := <-tenantEvents:
            switch event.Type {
            case "activated":
                info.Add(workers.NewWorker("tenant:"+event.ID).
                    HandlerFunc(makeTenantWorker(event.ID)))
            case "deactivated":
                info.Remove("tenant:" + event.ID)
            }
        }
    }
})

Example: Nested hierarchy

Children can spawn their own children — the supervisor tree goes as deep as needed:

workers.NewWorker("region-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    for _, region := range []string{"us-east", "eu-west"} {
        info.Add(workers.NewWorker("region:"+region).HandlerFunc(
            func(ctx context.Context, info *workers.WorkerInfo) error {
                zones := fetchZones(ctx, region)
                for _, zone := range zones {
                    info.Add(workers.NewWorker("zone:"+zone).HandlerFunc(makeZoneWorker(zone)))
                }
                <-ctx.Done()
                return ctx.Err()
            },
        ))
    }
    <-ctx.Done()
    return ctx.Err()
})
// Tree: region-manager → region:us-east → zone:us-east-1a, zone:us-east-1b
//                       → region:eu-west → zone:eu-west-1a

When region:us-east stops, all its zone workers stop automatically (scoped lifecycle).

Running Workers

Multiple workers

err := workers.Run(ctx, []*workers.Worker{w1, w2, w3})

Run blocks until ctx is cancelled and all workers have exited. A worker exiting early (without restart) does not stop other workers.

Single worker

workers.RunWorker(ctx, w)

RunWorker is a convenience for workers.Run(ctx, []*workers.Worker{w}). Unlike Run, it discards the error. Use Run if you need error handling.

Graceful Shutdown

When the context passed to Run is cancelled:

  1. All worker contexts are cancelled — handlers should return ctx.Err()
  2. BatchChannelWorker flushes any partial batch before returning
  3. handler.Close() is called exactly once (for CycleHandler implementations)
  4. Children stop when their parent stops (scoped lifecycle)
  5. Run returns nil

WithTimeout(d) controls how long suture waits for a worker to return after context cancellation. If a worker ignores cancellation and doesn’t return within the timeout, suture logs a stop-timeout event and abandons the goroutine.

Logging

Supervisor-level lifecycle events (panics, restarts, backoff, timeouts) are logged via stdlib log/slog. If your application configures slog.SetDefault, these events flow through your handler:

{"level":"ERROR","msg":"worker panicked","worker":"my-worker","event":"..."}
{"level":"WARN","msg":"worker terminated","worker":"my-worker","event":"..."}
{"level":"WARN","msg":"worker backoff","event":"..."}
{"level":"INFO","msg":"worker resumed","event":"..."}

Per-cycle logging is available via the middleware.Slog() and middleware.LogContext() interceptors, which use go-coldbrew/log (a wrapper around slog). Since go-coldbrew/log calls slog under the hood, slog.SetDefault affects both layers.

Metrics

Workers support pluggable metrics via the Metrics interface. Pass metrics at the root level — all workers and their children inherit them automatically.

Built-in Prometheus metrics

if err := workers.Run(ctx, myWorkers, workers.WithMetrics(workers.NewPrometheusMetrics("myapp"))); err != nil {
    slog.Error("workers failed", "error", err)
}

This registers the following metrics (auto-registered via promauto):

Metric Type Description
myapp_worker_started_total{worker} Counter Total worker starts
myapp_worker_stopped_total{worker} Counter Total worker stops
myapp_worker_panicked_total{worker} Counter Total worker panics
myapp_worker_failed_total{worker} Counter Total worker failures
myapp_worker_restarted_total{worker} Counter Total worker restarts
myapp_worker_run_duration_seconds{worker} Histogram Worker attempt lifetime (start to stop/failure)
myapp_worker_active_count Gauge Currently active workers

NewPrometheusMetrics is safe to call multiple times with the same namespace — it returns the cached instance.

No metrics (default)

_ = workers.Run(ctx, myWorkers) // uses BaseMetrics{} (no-op) — zero overhead

Custom metrics

Implement the Metrics interface for your own backend (Datadog, StatsD, etc.). Embed BaseMetrics for forward compatibility — new methods added to the interface get safe no-op defaults instead of breaking your build:

type myDatadogMetrics struct {
    workers.BaseMetrics // forward-compatible — new methods get no-op defaults
    client *datadog.Client
}

func (m *myDatadogMetrics) WorkerStarted(name string) {
    m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}

func (m *myDatadogMetrics) WorkerFailed(name string, err error) {
    m.client.Incr("worker.failed", []string{"worker:" + name}, 1)
}

// All other Metrics methods (Stopped, Panicked, Restarted, etc.)
// default to no-op via BaseMetrics.

Per-worker override

Children inherit metrics from the root by default. Override for specific workers via the builder:

workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
    // This child uses custom metrics instead of the inherited root metrics.
    info.Add(workers.NewWorker("special").HandlerFunc(processSpecial).WithMetrics(datadogMetrics))
    <-ctx.Done()
    return ctx.Err()
})

Testing

Testing middleware

Use NewWorkerInfo to create a *WorkerInfo for unit-testing middleware without running the full supervisor:

info := workers.NewWorkerInfo("test-worker", 0)
err := myMiddleware(ctx, info, func(ctx context.Context, info *workers.WorkerInfo) error {
    // assert middleware behavior
    return nil
})

Testing with dynamic children

Use WithTestChildren to create a WorkerInfo that supports Add/Remove/GetChildren:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

info := workers.NewWorkerInfo("manager", 0, workers.WithTestChildren(ctx))
info.Add(workers.NewWorker("child").HandlerFunc(childFn))
assert.Equal(t, []string{"child"}, info.GetChildren())

Integration testing

Use RunWorker with a short-lived context:

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, myWorker)
// assert side effects

Best Practices

  • Handler contract: Long-running workers should block on <-ctx.Done(). Periodic workers should return quickly from each tick.
  • WithRestart(false) vs ErrDoNotRestart: Use WithRestart(false) when a worker is unconditionally one-shot (known at build time). Use ErrDoNotRestart when the decision is made at runtime (e.g., channel closed, work exhausted).
  • Naming: Use descriptive names. For hierarchical workers, use colons: "region:us-east", "tenant:abc123".
  • Middleware ordering: The first middleware in the list is the outermost. Put Recover first (so it catches panics from all inner middleware), Tracing next, then domain-specific middleware.
  • Metrics inheritance: Set metrics once at the Run level. Override per-worker only when you need separate dashboards.
  • Distributed locking: Use DistributedLock for periodic workers that should run on only one pod. The lock is acquired per cycle, not per worker lifetime.

ColdBrew Integration

Planned — not yet available

The workers package is standalone — any Go service can use it. ColdBrew integration via CBServiceV2 is planned for a future core release, where workers will be started/stopped as part of the ColdBrew service lifecycle.