Table of contents
- Overview
- Quick Start
- Why Workers (vs Plain Goroutines)
- Creating Workers
- Handler Return Values
- Builder Methods
- Jitter
- Middleware
- Built-in Middleware
- WorkerInfo
- Helpers
- Dynamic Workers
- Running Workers
- Graceful Shutdown
- Logging
- Metrics
- Testing
- Best Practices
- ColdBrew Integration
Overview
workers is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown. It is built on top of suture, an Erlang-inspired supervisor tree library for Go.
Every worker runs inside its own supervisor subtree:
Root Supervisor
├── Worker-A supervisor
│ ├── Worker-A service (middleware → handler)
│ ├── Child-A1 (added dynamically)
│ └── Child-A2
└── Worker-B supervisor
└── Worker-B service (middleware → handler)
Key properties:
-
Scoped lifecycle — when a parent stops, all its children stop automatically. No manual cleanup or
sync.WaitGroupneeded. -
Restart by default — workers restart with exponential backoff on failure. One-shot workers opt out with
WithRestart(false)or returnErrDoNotRestart. -
Two-layer panic recovery — suture catches panics at the supervisor level (restarts the worker).
middleware.Recovercatches panics per-cycle (converts to error without a full restart). Use both for defense in depth. - Composable middleware — tracing, structured logging, distributed locking, per-cycle timeout, and duration metrics as gRPC-style interceptors. Write your own with a single function.
- Jitter — desynchronize periodic workers to prevent thundering herd. Per-worker or run-level default.
-
Dynamic children — workers can spawn and remove child workers at runtime via
Add/Remove. Children inherit middleware, metrics, and scoped lifecycle. -
Pluggable metrics — Prometheus out of the box, or implement the
Metricsinterface for Datadog, StatsD, etc. Per-attempt lifetime and per-cycle duration tracked separately. -
Handler cleanup —
CycleHandler.Close()is called exactly once when the worker permanently stops, for resource cleanup (DB connections, leases).
Quick Start
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
"github.com/go-coldbrew/workers"
"github.com/go-coldbrew/workers/middleware"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
err := workers.Run(ctx, []*workers.Worker{
// Long-running worker — blocks until ctx is cancelled
workers.NewWorker("kafka").HandlerFunc(consume),
// Periodic worker — runs cleanup every 5 minutes with jitter
workers.NewWorker("cleanup").HandlerFunc(cleanup).
Every(5 * time.Minute).WithJitter(10),
},
// Standard observability: panic recovery, log context, tracing, structured logging
workers.WithInterceptors(middleware.DefaultInterceptors()...),
)
if err != nil {
slog.Error("workers failed", "error", err)
}
}
// consume and cleanup have signature:
// func(ctx context.Context, info *workers.WorkerInfo) error
Run blocks until ctx is cancelled and all workers have exited.
Why Workers (vs Plain Goroutines)
With plain goroutines, you manage lifecycle manually:
// Before: manual goroutine management
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// no panic recovery — crashes the process
// no restart — dies permanently on error
// no structured shutdown — must coordinate ctx + wg manually
// no distributed locking — runs on every pod
consume(ctx)
}()
wg.Wait()
With workers, the framework handles all of that:
// After: workers handle lifecycle
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka").HandlerFunc(consume).
Interceptors(
middleware.Recover(onPanic),
middleware.Tracing(),
middleware.DistributedLock(redisLocker),
),
})
What you get for free: panic recovery, configurable restart with exponential backoff, scoped lifecycle (children stop when parents stop), composable middleware (tracing, logging, distributed locking, per-cycle timeout), jitter for periodic workers, and pluggable metrics. Distributed locking ensures only one instance runs a job across pods — no manual coordination.
Creating Workers
Use NewWorker with a name, then set a handler via HandlerFunc (for plain functions) or Handler (for structs with cleanup):
// Uses github.com/go-coldbrew/log for structured logging
w := workers.NewWorker("my-worker").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
log.Info(ctx, "msg", "started", "worker", info.GetName(), "attempt", info.GetAttempt())
<-ctx.Done()
return ctx.Err()
})
For handlers that need resource cleanup, implement the CycleHandler interface. Close() is called exactly once when the worker permanently stops:
type batchProcessor struct {
db *sql.DB
conn *sql.Conn // dedicated connection for this worker
}
func NewBatchProcessor(db *sql.DB) (*batchProcessor, error) {
conn, err := db.Conn(context.Background())
if err != nil {
return nil, err
}
return &batchProcessor{db: db, conn: conn}, nil
}
func (b *batchProcessor) RunCycle(ctx context.Context, info *workers.WorkerInfo) error {
rows, err := b.conn.QueryContext(ctx,
"SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 100")
if err != nil {
return err
}
defer rows.Close()
return processBatch(ctx, rows)
}
func (b *batchProcessor) Close() error {
// Called once on permanent stop — release the dedicated connection
return b.conn.Close()
}
The handler receives a context.Context for cancellation and a *WorkerInfo for worker metadata.
Handler Return Values
| Return value | Long-running worker (no Every) | Periodic worker (with Every) |
|---|---|---|
return nil | Worker stops permanently | Cycle succeeded — next tick fires |
return error | Restarts with backoff (if restart enabled) | Restarts with backoff (if restart enabled) |
return ctx.Err() | Clean shutdown | Clean shutdown |
return workers.ErrDoNotRestart | Permanent stop | Permanent stop |
Long-running workers should block on <-ctx.Done(), then return ctx.Err(). Returning nil without waiting for ctx cancellation stops the worker permanently.
Periodic workers run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The Every wrapper manages the tick loop — your handler just processes one cycle.
ErrDoNotRestart
Return workers.ErrDoNotRestart from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. ChannelWorker and BatchChannelWorker return this automatically when their channel is closed.
func processQueue(ctx context.Context, info *workers.WorkerInfo) error {
item, ok := queue.Dequeue(ctx)
if !ok {
return workers.ErrDoNotRestart // queue exhausted
}
return process(ctx, item)
}
Builder Methods
| Method | Description | Default |
|---|---|---|
HandlerFunc(fn) | Set handler from a plain function | — |
Handler(h) | Set handler from a CycleHandler struct | — |
WithRestart(false) | Disable restart (one-shot worker) |
true (restart with backoff) |
Every(duration) | Run periodically on a fixed interval | — |
WithJitter(percent) | Randomize tick interval by ±percent (requires Every) | inherit run-level |
WithInitialDelay(d) | Delay first tick (requires Every) | — |
Interceptors(mw...) | Replace worker-level middleware | — |
AddInterceptors(mw...) | Append to worker-level middleware | — |
WithFailureBackoff(d) | Duration between restarts | 15s (suture default) |
WithFailureThreshold(n float64) | Max failures before supervisor gives up | 5.0 (suture default) |
WithFailureDecay(rate float64) | Rate at which failure count decays (per second) | 1.0 (suture default) |
WithBackoffJitter(j) | Random jitter on restart backoff | none |
WithTimeout(d) | Max time to wait for graceful stop | 10s (suture default) |
WithMetrics(m Metrics) | Per-worker metrics override | inherit from parent/run |
Example with full configuration:
workers.NewWorker("resilient-consumer").HandlerFunc(consume).
Every(15 * time.Second).
WithJitter(10).
WithInitialDelay(5 * time.Second).
Interceptors(
middleware.Recover(onPanic),
middleware.Tracing(),
).
WithFailureBackoff(5 * time.Second).
WithFailureThreshold(10).
WithTimeout(30 * time.Second)
Jitter
When many workers share the same base interval (e.g. 15s), they synchronize and spike downstream services — the thundering herd problem. Jitter desynchronizes ticks by randomizing each interval within a configurable range.
Per-worker jitter
workers.NewWorker("poller").HandlerFunc(poll).
Every(15 * time.Second).
WithJitter(10) // each tick is within [13.5s, 16.5s)
Run-level default
Apply jitter to all periodic workers with WithDefaultJitter:
workers.Run(ctx, myWorkers, workers.WithDefaultJitter(10))
Worker-level WithJitter takes precedence over the run-level default. Setting WithJitter(0) explicitly disables jitter for a specific worker even when a run-level default is set.
Formula
On each tick:
spread = base × percent ÷ 100
jittered = base − spread + rand(2 × spread)
The effective interval is clamped to a minimum of 1ms (never zero or negative). Each tick recomputes independently — successive intervals differ.
Initial delay
WithInitialDelay delays the first tick, preventing N workers from all firing at t=0 on process start:
workers.NewWorker("poller").HandlerFunc(poll).
Every(15 * time.Second).
WithJitter(10).
WithInitialDelay(5 * time.Second)
Middleware
Middleware wraps each worker execution cycle with cross-cutting concerns like panic recovery, tracing, distributed locking, and timing. For periodic workers (Every), middleware runs on every tick, not once for the worker lifetime.
Types
// CycleHandler handles worker execution cycles.
// Implement as a struct for handlers that need cleanup.
type CycleHandler interface {
RunCycle(ctx context.Context, info *WorkerInfo) error
Close() error // called once when the worker stops
}
// CycleFunc adapts a plain function into a CycleHandler.
// Close is a no-op — use this for simple, stateless handlers.
type CycleFunc func(ctx context.Context, info *WorkerInfo) error
// Middleware intercepts each execution cycle.
// Call next to continue the chain. Matches gRPC interceptor convention.
type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) error
Worker-level middleware
w := workers.NewWorker("solver").HandlerFunc(solve).
Every(15 * time.Second).
Interceptors(
middleware.Recover(onPanic),
middleware.Tracing(),
middleware.Duration(observeDuration),
)
The first middleware in the list is the outermost wrapper (runs first on entry, last on exit), matching the gRPC interceptor convention.
Run-level middleware
WithInterceptors replaces and AddInterceptors appends to the run-level middleware list. These are run options that apply to all workers in the Run call — distinct from the worker-level (*Worker).Interceptors and (*Worker).AddInterceptors which only affect a single worker. Run-level middleware wraps outside worker-level middleware, so shared concerns like tracing are always outermost:
workers.Run(ctx, myWorkers,
workers.WithInterceptors(middleware.DefaultInterceptors()...),
workers.AddInterceptors(middleware.Duration(observe)),
)
Effective chain: run-level middleware → worker-level middleware → handler
Writing custom middleware
Middleware is a flat function that calls next to continue the chain. The *WorkerInfo parameter gives you the worker name and attempt explicitly — no hidden context lookups:
// Uses github.com/go-coldbrew/log for structured logging
func myLogging(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
log.Info(ctx, "msg", "cycle start", "worker", info.GetName())
err := next(ctx, info)
log.Info(ctx, "msg", "cycle end", "worker", info.GetName(), "error", err)
return err
}
// Attach it
w.Interceptors(myLogging)
Same shape as gRPC interceptors — familiar to the target audience:
// gRPC: func(ctx, req, info, handler) (resp, error)
// Workers: func(ctx, info, next) error
Built-in Middleware
The middleware sub-package ships optional middleware. None are applied by default.
import "github.com/go-coldbrew/workers/middleware"
| Middleware | Description |
|---|---|
Recover(onPanic) | Catches panics, calls callback, returns error |
Tracing() | Creates an OTEL span per cycle via go-coldbrew/tracing |
Duration(observe) | Measures wall-clock time of each cycle |
DistributedLock(locker, opts...) | Acquires a distributed lock before each cycle |
Timeout(d) | Enforces a per-cycle deadline |
Slog() | Structured log lines per cycle (start + end/error) via go-coldbrew/log |
LogContext() | Injects worker name + attempt into log context |
DefaultInterceptors() | Returns [Recover, LogContext, Tracing, Slog]
|
Recover
Catches panics in the worker cycle and converts them to errors. The panic does not propagate:
middleware.Recover(func(name string, v any) {
alerting.Send(fmt.Sprintf("worker %s panicked: %v", name, v))
})
Tracing
Creates an OTEL span named worker:<name>:cycle for each tick. Records errors on the span:
middleware.Tracing()
Duration
Measures wall-clock time of each cycle and calls a callback. This is per-cycle timing — distinct from the per-attempt lifetime captured by Metrics.ObserveRunDuration (the worker_run_duration_seconds Prometheus histogram).
middleware.Duration(func(name string, d time.Duration) {
metrics.RecordCycleDuration(name, d)
})
DistributedLock
Acquires a distributed lock before each cycle. If the lock is held by another instance, the cycle is skipped:
middleware.DistributedLock(redisLocker,
middleware.WithKeyFunc(func(name string) string {
return "myapp:lock:" + name
}),
middleware.WithTTLFunc(func(_ string) time.Duration {
return time.Minute
}),
middleware.WithOnNotAcquired(func(ctx context.Context, name string) error {
log.Info(ctx, "msg", "lock held, skipping", "worker", name) // go-coldbrew/log
return nil
}),
)
The Locker interface:
type Locker interface {
Acquire(ctx context.Context, key string, ttl time.Duration) (bool, error)
Release(ctx context.Context, key string) error
}
Release uses context.WithoutCancel so that context cancellation does not prevent lock cleanup.
Timeout
Enforces a per-cycle deadline. Distinct from WithTimeout (which controls graceful shutdown):
middleware.Timeout(30 * time.Second)
Slog
Structured log lines per cycle (start + end/error) via go-coldbrew/log. Pair with LogContext() to include worker name and attempt automatically:
middleware.Slog()
LogContext
Injects worker name and attempt into the log context so all log calls inside the worker automatically include them:
middleware.LogContext()
DefaultInterceptors
Convenience bundle for the standard observability stack:
// Zero-config observability — one line
workers.Run(ctx, myWorkers,
workers.WithInterceptors(middleware.DefaultInterceptors()...),
)
// Defaults + extras
workers.Run(ctx, myWorkers,
workers.WithInterceptors(middleware.DefaultInterceptors()...),
workers.AddInterceptors(middleware.Duration(observe)),
)
WorkerInfo
Every handler receives a *WorkerInfo that carries worker metadata and child management:
| Method | Description |
|---|---|
GetName() string | Worker name |
GetAttempt() int | Restart attempt (0 on first run) |
Add(w *Worker) bool | Add child worker — returns false if name already exists (no-op) |
Remove(name string) | Stop child worker by name |
GetChildren() []string | Names of running child workers |
GetChild(name string) (Worker, bool) | Look up a child by name (returns a value copy) |
Use Worker.GetName() and Worker.GetHandler() to inspect a child.
To replace a running worker, call Remove then Add. This is not atomic — there is a brief window where the worker is not running.
context.Context handles cancellation/deadlines/values. *WorkerInfo handles everything worker-specific.
Helpers
EveryInterval
Use the Every builder method to run a handler periodically:
workers.NewWorker("metrics-reporter").HandlerFunc(reportMetrics).
Every(30 * time.Second)
For manual control, EveryInterval wraps a handler in a timer loop directly:
workers.NewWorker("metrics-reporter").HandlerFunc(workers.EveryInterval(
30*time.Second, reportMetrics,
))
Both are equivalent. The builder form is preferred — it also supports WithJitter and WithInitialDelay.
ChannelWorker
Consumes items from a channel one at a time:
refreshChan := make(chan string, 100)
workers.NewWorker("refresher").HandlerFunc(workers.ChannelWorker(refreshChan,
func(ctx context.Context, info *workers.WorkerInfo, driverID string) error {
return refreshDriverProfile(ctx, driverID)
},
))
BatchChannelWorker
Collects items into batches, flushing when the batch reaches maxSize or maxDelay elapses since the first item:
eventChan := make(chan Event, 1000)
workers.NewWorker("event-batcher").HandlerFunc(workers.BatchChannelWorker(eventChan,
100, // max batch size
500*time.Millisecond, // max delay
func(ctx context.Context, info *workers.WorkerInfo, batch []Event) error {
return store.BulkInsert(ctx, batch)
},
))
Partial batches are flushed on context cancellation (graceful shutdown). Both ChannelWorker and BatchChannelWorker return ErrDoNotRestart when the channel is closed, preventing restart loops on exhausted channels.
Dynamic Workers
Workers can dynamically spawn and remove child workers using WorkerInfo.Add, Remove, and GetChildren. This is the pattern for config-driven worker pools (like database-driven solver workers):
workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err() // children stop automatically
case <-ticker.C:
desired := loadConfigsFromDB(ctx)
running := map[string]bool{}
for _, name := range info.GetChildren() {
running[name] = true
}
// Remove workers no longer in config
for name := range running {
if _, ok := desired[name]; !ok {
info.Remove(name)
}
}
// Add only workers that aren't already running
for name, cfg := range desired {
if !running[name] {
info.Add(workers.NewWorker(name).HandlerFunc(makeSolver(cfg)))
}
}
}
}
})
Add is a no-op if the name exists — it returns false without restarting the running worker. To replace a worker (e.g., on config change), call Remove then Add:
info.Remove("solver")
info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg)))
Note: Remove + Add is not atomic — there is a brief window where the worker is not running.
Example: Fixed children on startup
A worker that spawns N consumer goroutines when it starts:
workers.NewWorker("consumer-pool").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
for i := range 5 {
name := fmt.Sprintf("consumer-%d", i)
info.Add(workers.NewWorker(name).HandlerFunc(workers.ChannelWorker(eventChan, processEvent)))
}
<-ctx.Done()
return ctx.Err() // all 5 consumers stop with parent
})
Example: Per-tenant workers
Spawn a dedicated worker when a new tenant appears, remove it when the tenant is deactivated:
workers.NewWorker("tenant-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-tenantEvents:
switch event.Type {
case "activated":
info.Add(workers.NewWorker("tenant:"+event.ID).
HandlerFunc(makeTenantWorker(event.ID)))
case "deactivated":
info.Remove("tenant:" + event.ID)
}
}
}
})
Example: Nested hierarchy
Children can spawn their own children — the supervisor tree goes as deep as needed:
workers.NewWorker("region-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
for _, region := range []string{"us-east", "eu-west"} {
info.Add(workers.NewWorker("region:"+region).HandlerFunc(
func(ctx context.Context, info *workers.WorkerInfo) error {
zones := fetchZones(ctx, region)
for _, zone := range zones {
info.Add(workers.NewWorker("zone:"+zone).HandlerFunc(makeZoneWorker(zone)))
}
<-ctx.Done()
return ctx.Err()
},
))
}
<-ctx.Done()
return ctx.Err()
})
// Tree: region-manager → region:us-east → zone:us-east-1a, zone:us-east-1b
// → region:eu-west → zone:eu-west-1a
When region:us-east stops, all its zone workers stop automatically (scoped lifecycle).
Running Workers
Multiple workers
err := workers.Run(ctx, []*workers.Worker{w1, w2, w3})
Run blocks until ctx is cancelled and all workers have exited. A worker exiting early (without restart) does not stop other workers.
Single worker
workers.RunWorker(ctx, w)
RunWorker is a convenience for workers.Run(ctx, []*workers.Worker{w}). Unlike Run, it discards the error. Use Run if you need error handling.
Graceful Shutdown
When the context passed to Run is cancelled:
- All worker contexts are cancelled — handlers should return
ctx.Err() -
BatchChannelWorkerflushes any partial batch before returning -
handler.Close()is called exactly once (forCycleHandlerimplementations) - Children stop when their parent stops (scoped lifecycle)
-
Runreturns nil
WithTimeout(d) controls how long suture waits for a worker to return after context cancellation. If a worker ignores cancellation and doesn’t return within the timeout, suture logs a stop-timeout event and abandons the goroutine.
Logging
Supervisor-level lifecycle events (panics, restarts, backoff, timeouts) are logged via stdlib log/slog. If your application configures slog.SetDefault, these events flow through your handler:
{"level":"ERROR","msg":"worker panicked","worker":"my-worker","event":"..."}
{"level":"WARN","msg":"worker terminated","worker":"my-worker","event":"..."}
{"level":"WARN","msg":"worker backoff","event":"..."}
{"level":"INFO","msg":"worker resumed","event":"..."}
Per-cycle logging is available via the middleware.Slog() and middleware.LogContext() interceptors, which use go-coldbrew/log (a wrapper around slog). Since go-coldbrew/log calls slog under the hood, slog.SetDefault affects both layers.
Metrics
Workers support pluggable metrics via the Metrics interface. Pass metrics at the root level — all workers and their children inherit them automatically.
Built-in Prometheus metrics
if err := workers.Run(ctx, myWorkers, workers.WithMetrics(workers.NewPrometheusMetrics("myapp"))); err != nil {
slog.Error("workers failed", "error", err)
}
This registers the following metrics (auto-registered via promauto):
| Metric | Type | Description |
|---|---|---|
myapp_worker_started_total{worker} | Counter | Total worker starts |
myapp_worker_stopped_total{worker} | Counter | Total worker stops |
myapp_worker_panicked_total{worker} | Counter | Total worker panics |
myapp_worker_failed_total{worker} | Counter | Total worker failures |
myapp_worker_restarted_total{worker} | Counter | Total worker restarts |
myapp_worker_run_duration_seconds{worker} | Histogram | Worker attempt lifetime (start to stop/failure) |
myapp_worker_active_count | Gauge | Currently active workers |
NewPrometheusMetrics is safe to call multiple times with the same namespace — it returns the cached instance.
No metrics (default)
_ = workers.Run(ctx, myWorkers) // uses BaseMetrics{} (no-op) — zero overhead
Custom metrics
Implement the Metrics interface for your own backend (Datadog, StatsD, etc.). Embed BaseMetrics for forward compatibility — new methods added to the interface get safe no-op defaults instead of breaking your build:
type myDatadogMetrics struct {
workers.BaseMetrics // forward-compatible — new methods get no-op defaults
client *datadog.Client
}
func (m *myDatadogMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
func (m *myDatadogMetrics) WorkerFailed(name string, err error) {
m.client.Incr("worker.failed", []string{"worker:" + name}, 1)
}
// All other Metrics methods (Stopped, Panicked, Restarted, etc.)
// default to no-op via BaseMetrics.
Per-worker override
Children inherit metrics from the root by default. Override for specific workers via the builder:
workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
// This child uses custom metrics instead of the inherited root metrics.
info.Add(workers.NewWorker("special").HandlerFunc(processSpecial).WithMetrics(datadogMetrics))
<-ctx.Done()
return ctx.Err()
})
Testing
Testing middleware
Use NewWorkerInfo to create a *WorkerInfo for unit-testing middleware without running the full supervisor:
info := workers.NewWorkerInfo("test-worker", 0)
err := myMiddleware(ctx, info, func(ctx context.Context, info *workers.WorkerInfo) error {
// assert middleware behavior
return nil
})
Testing with dynamic children
Use WithTestChildren to create a WorkerInfo that supports Add/Remove/GetChildren:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
info := workers.NewWorkerInfo("manager", 0, workers.WithTestChildren(ctx))
info.Add(workers.NewWorker("child").HandlerFunc(childFn))
assert.Equal(t, []string{"child"}, info.GetChildren())
Integration testing
Use RunWorker with a short-lived context:
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, myWorker)
// assert side effects
Best Practices
-
Handler contract: Long-running workers should block on
<-ctx.Done(). Periodic workers should return quickly from each tick. -
WithRestart(false)vsErrDoNotRestart: UseWithRestart(false)when a worker is unconditionally one-shot (known at build time). UseErrDoNotRestartwhen the decision is made at runtime (e.g., channel closed, work exhausted). -
Naming: Use descriptive names. For hierarchical workers, use colons:
"region:us-east","tenant:abc123". -
Middleware ordering: The first middleware in the list is the outermost. Put
Recoverfirst (so it catches panics from all inner middleware),Tracingnext, then domain-specific middleware. -
Metrics inheritance: Set metrics once at the
Runlevel. Override per-worker only when you need separate dashboards. -
Distributed locking: Use
DistributedLockfor periodic workers that should run on only one pod. The lock is acquired per cycle, not per worker lifetime.
ColdBrew Integration
Planned — not yet available
The workers package is standalone — any Go service can use it. ColdBrew integration via CBServiceV2 is planned for a future core release, where workers will be started/stopped as part of the ColdBrew service lifecycle.