Streaming RPCs

Table of contents

  1. Streaming RPCs
    1. When to use streaming
    2. Defining streaming methods
    3. Handler patterns
      1. Server-streaming
      2. Client-streaming
      3. Bidirectional
    4. Deadline propagation
    5. Backpressure
    6. grpc-gateway HTTP support
    7. Related

ColdBrew supports all four gRPC method shapes: unary, server-streaming, client-streaming, and bidirectional streaming. The default stream interceptor chain (response-time logging, protovalidate, metrics, panic recovery) is applied automatically — you don’t need to opt in.

This page covers when to use each shape, the handler patterns, and the practical limits when serving the same methods through grpc-gateway over HTTP.

When to use streaming

Shape When to use it When not to
Server-streaming Pushing a sequence of items the server already knows about — paginated results without round-trips, log tailing, change feeds, real-time updates. One-shot responses (use unary).
Client-streaming Uploading a large payload in chunks, accumulating samples, batch ingest. When the server needs to acknowledge each item — you’d want bidi.
Bidirectional Long-lived sessions where both sides send messages independently — chat, collaborative editing, multiplexed control channels. When ordering between directions doesn’t matter — server-streaming + a separate unary call is simpler.

Defining streaming methods

Use the stream keyword on the request, response, or both in your .proto file:

service Events {
  // Unary
  rpc GetEvent(GetEventRequest) returns (Event) {}

  // Server-streaming — one request, many responses
  rpc StreamEvents(StreamEventsRequest) returns (stream Event) {}

  // Client-streaming — many requests, one response
  rpc UploadSamples(stream Sample) returns (UploadSummary) {}

  // Bidirectional — many requests, many responses, independent timing
  rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}

Run buf generate (or make proto) the same way you would for unary methods.

Handler patterns

Server-streaming

The handler receives the request and a stream it can Send on as many times as it wants:

func (s *EventsService) StreamEvents(req *pb.StreamEventsRequest, stream pb.Events_StreamEventsServer) error {
    ctx := stream.Context()
    for ev := range s.subscribe(ctx, req.GetTopic()) {
        if err := stream.Send(ev); err != nil {
            // Client disconnected or context cancelled — stop producing.
            return err
        }
    }
    return nil
}

Two things to notice:

  • Always honour stream.Context(). If the client disconnects or hits its deadline, the context is cancelled — your producer goroutine should exit.
  • Send returning an error means the stream is dead. Don’t keep producing.

Client-streaming

The handler reads from the stream until io.EOF, then sends a single response:

import "io"

func (s *EventsService) UploadSamples(stream pb.Events_UploadSamplesServer) error {
    var count int64
    for {
        sample, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.UploadSummary{Count: count})
        }
        if err != nil {
            return err
        }
        count++
        // process sample
    }
}

io.EOF is the expected end-of-stream signal — it is not an error. Any other error means the client died or context was cancelled.

Bidirectional

The simplest pattern is to read in a goroutine and write from the main goroutine (or vice versa). Use the stream’s context to coordinate cancellation:

import "io"

func (s *ChatService) Chat(stream pb.Chat_ChatServer) error {
    ctx := stream.Context()

    // Reader goroutine
    incoming := make(chan *pb.ChatMessage)
    errCh := make(chan error, 1)
    go func() {
        defer close(incoming)
        for {
            msg, err := stream.Recv()
            if err != nil {
                errCh <- err
                return
            }
            select {
            case incoming <- msg:
            case <-ctx.Done():
                return
            }
        }
    }()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case err := <-errCh:
            if err == io.EOF {
                return nil
            }
            return err
        case msg, ok := <-incoming:
            if !ok {
                // Reader closed the channel — wait for the error from errCh
                // on the next iteration so EOF and read errors propagate.
                incoming = nil
                continue
            }
            reply := s.handle(msg)
            if err := stream.Send(reply); err != nil {
                return err
            }
        }
    }
}

This shape generalizes to any independent-timing protocol. For half-duplex flows (every Recv followed by a Send), a simple for { Recv; ... ; Send } loop is fine — but you lose the ability to push unsolicited messages from the server.

Deadline propagation

Stream contexts inherit the client’s deadline the same way unary calls do. When the client sets a per-call deadline:

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
stream, err := client.StreamEvents(ctx, req)

…then stream.Context() on the server side fires Done() at the same instant, and the next Send or Recv returns the cancellation error. Always check stream.Context().Err() (or rely on Send/Recv returning) inside long-running loops — that is your only signal to stop producing.

For long-lived streams (chat, change feeds), a single deadline rarely makes sense. Instead, set keepalive on the gRPC client/server (the cookiecutter sets sensible defaults) and let TCP failures surface as context cancellation.

Backpressure

gRPC-Go does not expose explicit flow-control hooks — Send blocks once the HTTP/2 send window fills, and that is the backpressure signal. The patterns that stay healthy:

  • Producer-loop pattern. Generate one item per loop iteration, call stream.Send, check the error, and only then move on. If the consumer is slow, your loop slows naturally.
  • Bounded buffer between source and stream. If the data source can’t be paced (e.g. it’s a Kafka subscription), put a bounded channel between the source and the gRPC Send loop. When the channel fills, drop or block at the source — explicitly, not by accident.
  • Don’t fan-out into goroutines per message. Each stream.Send must complete before the next one starts; goroutines all calling Send on the same stream race.

If you need true high-throughput multiplexing on a single stream, consider chunking many logical messages into one large protobuf message instead of relying on per-message backpressure.

grpc-gateway HTTP support

grpc-gateway translates streaming RPCs to HTTP, but the four shapes don’t map cleanly to HTTP/1.1:

gRPC shape HTTP behaviour through the gateway
Unary Standard request/response.
Server-streaming Newline-delimited JSON (NDJSON) over a chunked HTTP response. Clients read line-by-line.
Client-streaming Limited — the gateway buffers and forwards as a unary gRPC stream. Don’t rely on this for large or long-lived uploads.
Bidirectional Limited — no true concurrent interleaving over HTTP/1.1. Avoid for HTTP clients.

Practical things to know:

  • Reverse proxies buffer streamed responses. Nginx, Cloudflare, and similar will hold chunks until they have “enough.” Set X-Accel-Buffering: no on the response (or the upstream config) to disable buffering when you actually need server-streaming over HTTP.
  • Stream errors need a handler. Use runtime.WithStreamErrorHandler when registering the gateway to control how mid-stream errors render in the HTTP response. The default trailers-only behaviour is awkward to consume from a JSON client.
  • Native HTTP alternatives. If your HTTP clients need true bidirectional or high-frequency push, consider a separate WebSocket or Server-Sent Events endpoint registered via HTTP Gateway Extensions. Keep the gRPC stream as the canonical implementation and have the WebSocket handler delegate to it.

See the grpc-gateway streaming examples for the full HTTP semantics.

  • APIs how-to — Defining gRPC and HTTP endpoints from proto.
  • Interceptors — The stream interceptor chain and how to add your own.
  • HTTP Gateway Extensions — Custom marshalers and routes when the gateway alone isn’t enough.
  • Tracing — Trace IDs propagate through stream contexts the same way they do for unary.