Day 136 — Redis Job Queue¶
Month 5 · Week 4 · ⬅ Day 135 · Day 137 ➡ · Journal index
🎯 Learning Objective¶
Implement the Queue port with a Redis list (LPUSH/BRPOP) and drain it with a context-aware worker pool, understanding the producer/consumer contract and backpressure.
📚 Topics¶
- Redis as a broker:
LPUSH+BRPOP(reliable variantBRPOPLPUSH) - Worker pool over a buffered channel; producer owns/closes the channel
- Backpressure, graceful drain,
contextcancellation
📖 Reading / Sources¶
- Redis lists as queues —
BRPOP - Reliable queue pattern —
BRPOPLPUSH/LMOVE -
go-redisclient (API reference) - Go blog — Pipelines & cancellation
📝 Notes¶
- A Redis list is a queue: producers
LPUSH jobonto the head, workersBRPOP(blocking pop) from the tail.BRPOPblocks up to a timeout, so an idle worker doesn't busy-loop → [[job-queue]] [[redis]]. - Plain
BRPOPis at-most-once: if a worker crashes after popping but before finishing, the job is lost. The reliable pattern isBRPOPLPUSH/LMOVEinto a processing list, thenLREMon success — so a crash leaves the job recoverable → [[reliable-queue]] [[at-least-once]]. - In Go, the in-process side is a worker pool: N goroutines pull from a
chan Job. A buffered channel bounds how many jobs wait in memory — once full, the producer blocks. That blocking IS backpressure: it stops the producer from outrunning the workers → [[backpressure]] [[worker-pool]]. - Channel ownership: the producer owns the channel and is the only one that closes it — exactly once. Workers
rangeover it and exit when it's closed and drained. Closing from a receiver, or twice, panics → [[channel-axioms]]. - Graceful drain = stop accepting new jobs, close the jobs channel,
WaitGroup.Wait()until workers finish in-flight items. Pair withctx.Done()in each worker'sselectso a hard shutdown can abort promptly → [[graceful-shutdown]]. - Serialize the job payload at the boundary (JSON or protobuf bytes) so Redis stores opaque blobs; the worker deserializes back into a domain
Job→ [[serialization]].
💻 Code Examples¶
// internal/adapter/redisqueue — the real Queue port (go-redis is third-party,
// so this is a day-note snippet; the channel mechanics are runnable in stdlib).
type RedisQueue struct {
rdb *redis.Client
key string
}
func (q *RedisQueue) Push(ctx context.Context, j core.Job) error {
b, err := json.Marshal(j)
if err != nil {
return err
}
return q.rdb.LPush(ctx, q.key, b).Err() // producer side
}
// Worker: reliable pop into a processing list, then ack by removing it.
func (q *RedisQueue) pop(ctx context.Context) (core.Job, error) {
// BRPOPLPUSH src dst timeout: atomically move + return; blocks up to 5s.
raw, err := q.rdb.BRPopLPush(ctx, q.key, q.key+":processing", 5*time.Second).Result()
if err != nil {
return core.Job{}, err // redis.Nil on timeout
}
var j core.Job
return j, json.Unmarshal([]byte(raw), &j)
}
var _ core.Queue = (*RedisQueue)(nil) // compile-time port check
The in-process side is fully runnable in the stdlib: a buffered-channel queue with backpressure (
examples/month-05/jobqueue) and a context-aware worker pool with graceful drain (examples/month-05/workerpool). Run:go run ./examples/month-05/jobqueue·go run ./examples/month-05/workerpool
🏋️ Exercises / Practice¶
| Exercise | Status | Link |
|---|---|---|
| Buffered-channel queue with backpressure + drain | ✅ | examples/month-05/jobqueue |
| Worker pool with context-driven shutdown | ✅ | examples/month-05/workerpool |
🐛 Mistakes Made¶
- Closed the jobs channel from a worker to "signal done" → panic (
close of closed channel) when a second worker also closed it. Only the producer closes, once. - Used plain
RPOPand lost a job when a worker panicked mid-process. Switched toBRPOPLPUSHinto a processing list so the job is recoverable.
❓ Open Questions¶
- How do I reclaim stuck jobs left in the
:processinglist after a crash — a reaper that re-queues entries older than a TTL?
🧠 Active Recall (answer without looking)¶
- Q: Why is
BRPOPalone "at-most-once," and what makesBRPOPLPUSHmore reliable?
A
`BRPOP` removes the job from the list the instant it's popped; if the worker dies before finishing, the job is gone. `BRPOPLPUSH` (or `LMOVE`) atomically moves the job to a processing list, so a crash leaves it there to be reclaimed; the worker deletes it only after success (an ack).- Q: In the Go worker pool, what provides backpressure and who is allowed to close the jobs channel?
A
A *buffered* channel provides backpressure: when it's full, the producer's send blocks until a worker frees a slot. Only the producer (channel owner) closes it, exactly once; workers `range` over it and exit on close. Closing from a receiver or twice panics.🪶 Feynman Reflection¶
The queue is a conveyor belt. Producers drop boxes on one end (LPUSH); workers grab boxes off the other end (BRPOP). If the belt is full, the producer has to wait — that waiting is backpressure, and it keeps the system from drowning. To avoid losing a box if a worker trips, we slide it onto a side table (:processing) while it's handled and only throw away the record once the work is done.
🕳️ Knowledge Gaps¶
- Tuning pool size vs Redis round-trip latency, and whether to pipeline pops.
✅ Summary¶
I can back the Queue port with a Redis list, choose the reliable BRPOPLPUSH variant for at-least-once delivery, and drain it with a bounded, context-aware worker pool that applies backpressure and shuts down gracefully.
⏭️ Next Steps / Prep for Tomorrow¶
- Day 137: make processing resilient — exponential backoff, retry budgets, and a dead-letter queue.
| Time spent | Difficulty | Confidence |
|---|---|---|
| 90 min | 🟦🟦⬜⬜⬜ | 🟦🟦🟦⬜⬜ |
Suggested commit: feat(queue): redis-backed job queue with worker pool (day 136)