Day 073 — Pipelines¶
Month 3 · Week 3 · ⬅ Day 072 · Day 074 ➡ · Journal index
🎯 Learning Objective¶
Compose work as a pipeline of channel-connected stages, and make every stage cancellable so an early-quitting consumer never leaks upstream goroutines.
📚 Topics¶
- Stage shape: receive → process → send, owning and closing its output channel
- Connecting stages:
source → square → filter → sink - Early termination with a
done/contextchannel; avoiding goroutine leaks
📖 Reading / Sources¶
- Go blog — Go Concurrency Patterns: Pipelines and cancellation
- Go blog — Advanced Go Concurrency Patterns
-
contextpackage docs
📝 Notes¶
- A pipeline is a series of stages connected by channels. Each stage is a goroutine that receives from an inbound channel, does work, and sends on an outbound channel it owns and closes → [[pipeline]].
- Stage ownership rule: a stage closes the channel it sends on (so downstream
rangeends), and stops sending only when its inbound channel is closed or it's told to quit → [[channel-ownership]]. - The leak: if the consumer stops reading early, a stage blocked on
out <- vwaits forever, and so does everyone upstream. Those goroutines (and their memory) never get collected → [[goroutine-leak]]. - The fix: every blocking send becomes a
selectonout <- vand<-ctx.Done()(or adonechannel). Cancelling propagates the quit signal up the whole pipeline → [[context-cancellation]]. - Prefer
context.Contextover a hand-rolleddone chan struct{}in real code: it composes (timeouts, deadlines, values) and is the ecosystem convention → [[context-first-param]]. - A pipeline naturally provides backpressure: a slow stage blocks its predecessor, so memory use stays bounded by the channel buffers, not the input size.
- Fan-out/fan-in (Day 072) slot in as a parallel stage in the middle of a pipeline when one step is the bottleneck.
💻 Code Examples¶
// sendOrDone makes a stage abandon its send when the context is cancelled,
// so upstream never blocks forever after the consumer quits.
func sendOrDone[T any](ctx context.Context, out chan<- T, v T) bool {
select {
case out <- v:
return true
case <-ctx.Done():
return false
}
}
func stage(ctx context.Context, in <-chan int, fn func(int) int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // this stage owns and closes its output
for v := range in {
if !sendOrDone(ctx, out, fn(v)) {
return
}
}
}()
return out
}
Full code:
examples/month-03/pipeline-cancel/main.go· Run:go run ./examples/month-03/pipeline-cancel
🏋️ Exercises / Practice¶
| Exercise | Status | Link |
|---|---|---|
Order-preserving bounded Map with context cancel (a 1-stage pipeline) |
✅ | exercises/month-03/week-3/boundedmap/ |
🐛 Mistakes Made¶
- Built a 3-stage pipeline, took only the first few results, and returned —
pprof/runtime.NumGoroutine()showed leaked stages blocked on send. Adding thectx.Done()arm to every send fixed it. - Closed an inbound channel from a downstream stage. A stage may only close the channel it owns (the one it sends on).
❓ Open Questions¶
- How do I bound a pipeline's parallel stage to N concurrent workers cleanly? (Semaphore — Day 074.)
🧠 Active Recall (answer without looking)¶
-
Q: Why can an early-exiting consumer leak goroutines, and how do you prevent it?
A
Upstream stages block forever on `out <- v` because nobody reads, so their goroutines never return and never get GC'd. Prevent it by `select`ing each send against `<-ctx.Done()` and returning when the context is cancelled. -
Q: Which channel is a stage allowed to
close?
A
Only the output channel it owns (the one it sends on). Closing an inbound channel would race the upstream sender and can cause send-on-closed panics.
🪶 Feynman Reflection¶
A pipeline is an assembly line: each station takes a part off the belt in front of it, works on it, and puts it on the belt behind it, closing that belt when it has nothing more to add. If the last station walks off, you need a whistle (ctx.Done()) that every station listens for, or the upstream ones stand frozen holding a part forever.
🕳️ Knowledge Gaps¶
- Deadlines/timeouts on a whole pipeline run —
context.WithTimeout(Day 075).
✅ Summary¶
I can build cancellable pipelines: each stage owns/closes its output, ranges its input, and selects every send against ctx.Done() so early termination cleans up the whole chain without leaks.
⏭️ Next Steps / Prep for Tomorrow¶
- Day 074: cap concurrency precisely with semaphores and bounded parallelism.
| Time spent | Difficulty | Confidence |
|---|---|---|
| 90 min | 🟦🟦⬜⬜⬜ | 🟦🟦🟦⬜⬜ |
Suggested commit: feat(examples): cancellable pipeline stages (day 073)