Skip to content

Day 073 — Pipelines

Month 3 · Week 3 · ⬅ Day 072 · Day 074 ➡ · Journal index

🎯 Learning Objective

Compose work as a pipeline of channel-connected stages, and make every stage cancellable so an early-quitting consumer never leaks upstream goroutines.

📚 Topics

  • Stage shape: receive → process → send, owning and closing its output channel
  • Connecting stages: source → square → filter → sink
  • Early termination with a done/context channel; avoiding goroutine leaks

📖 Reading / Sources

📝 Notes

  • A pipeline is a series of stages connected by channels. Each stage is a goroutine that receives from an inbound channel, does work, and sends on an outbound channel it owns and closes → [[pipeline]].
  • Stage ownership rule: a stage closes the channel it sends on (so downstream range ends), and stops sending only when its inbound channel is closed or it's told to quit → [[channel-ownership]].
  • The leak: if the consumer stops reading early, a stage blocked on out <- v waits forever, and so does everyone upstream. Those goroutines (and their memory) never get collected → [[goroutine-leak]].
  • The fix: every blocking send becomes a select on out <- v and <-ctx.Done() (or a done channel). Cancelling propagates the quit signal up the whole pipeline → [[context-cancellation]].
  • Prefer context.Context over a hand-rolled done chan struct{} in real code: it composes (timeouts, deadlines, values) and is the ecosystem convention → [[context-first-param]].
  • A pipeline naturally provides backpressure: a slow stage blocks its predecessor, so memory use stays bounded by the channel buffers, not the input size.
  • Fan-out/fan-in (Day 072) slot in as a parallel stage in the middle of a pipeline when one step is the bottleneck.

💻 Code Examples

// sendOrDone makes a stage abandon its send when the context is cancelled,
// so upstream never blocks forever after the consumer quits.
func sendOrDone[T any](ctx context.Context, out chan<- T, v T) bool {
    select {
    case out <- v:
        return true
    case <-ctx.Done():
        return false
    }
}

func stage(ctx context.Context, in <-chan int, fn func(int) int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out) // this stage owns and closes its output
        for v := range in {
            if !sendOrDone(ctx, out, fn(v)) {
                return
            }
        }
    }()
    return out
}

Full code: examples/month-03/pipeline-cancel/main.go · Run: go run ./examples/month-03/pipeline-cancel

🏋️ Exercises / Practice

Exercise Status Link
Order-preserving bounded Map with context cancel (a 1-stage pipeline) exercises/month-03/week-3/boundedmap/

🐛 Mistakes Made

  • Built a 3-stage pipeline, took only the first few results, and returned — pprof/runtime.NumGoroutine() showed leaked stages blocked on send. Adding the ctx.Done() arm to every send fixed it.
  • Closed an inbound channel from a downstream stage. A stage may only close the channel it owns (the one it sends on).

❓ Open Questions

  • How do I bound a pipeline's parallel stage to N concurrent workers cleanly? (Semaphore — Day 074.)

🧠 Active Recall (answer without looking)

  1. Q: Why can an early-exiting consumer leak goroutines, and how do you prevent it?

    A Upstream stages block forever on `out <- v` because nobody reads, so their goroutines never return and never get GC'd. Prevent it by `select`ing each send against `<-ctx.Done()` and returning when the context is cancelled.

  2. Q: Which channel is a stage allowed to close?

    A Only the output channel it owns (the one it sends on). Closing an inbound channel would race the upstream sender and can cause send-on-closed panics.

🪶 Feynman Reflection

A pipeline is an assembly line: each station takes a part off the belt in front of it, works on it, and puts it on the belt behind it, closing that belt when it has nothing more to add. If the last station walks off, you need a whistle (ctx.Done()) that every station listens for, or the upstream ones stand frozen holding a part forever.

🕳️ Knowledge Gaps

  • Deadlines/timeouts on a whole pipeline run — context.WithTimeout (Day 075).

✅ Summary

I can build cancellable pipelines: each stage owns/closes its output, ranges its input, and selects every send against ctx.Done() so early termination cleans up the whole chain without leaks.

⏭️ Next Steps / Prep for Tomorrow

  • Day 074: cap concurrency precisely with semaphores and bounded parallelism.

Time spent Difficulty Confidence
90 min 🟦🟦⬜⬜⬜ 🟦🟦🟦⬜⬜

Suggested commit: feat(examples): cancellable pipeline stages (day 073)