Skip to content

Day 136 — Redis Job Queue

Month 5 · Week 4 · ⬅ Day 135 · Day 137 ➡ · Journal index

🎯 Learning Objective

Implement the Queue port with a Redis list (LPUSH/BRPOP) and drain it with a context-aware worker pool, understanding the producer/consumer contract and backpressure.

📚 Topics

  • Redis as a broker: LPUSH + BRPOP (reliable variant BRPOPLPUSH)
  • Worker pool over a buffered channel; producer owns/closes the channel
  • Backpressure, graceful drain, context cancellation

📖 Reading / Sources

📝 Notes

  • A Redis list is a queue: producers LPUSH job onto the head, workers BRPOP (blocking pop) from the tail. BRPOP blocks up to a timeout, so an idle worker doesn't busy-loop → [[job-queue]] [[redis]].
  • Plain BRPOP is at-most-once: if a worker crashes after popping but before finishing, the job is lost. The reliable pattern is BRPOPLPUSH/LMOVE into a processing list, then LREM on success — so a crash leaves the job recoverable → [[reliable-queue]] [[at-least-once]].
  • In Go, the in-process side is a worker pool: N goroutines pull from a chan Job. A buffered channel bounds how many jobs wait in memory — once full, the producer blocks. That blocking IS backpressure: it stops the producer from outrunning the workers → [[backpressure]] [[worker-pool]].
  • Channel ownership: the producer owns the channel and is the only one that closes it — exactly once. Workers range over it and exit when it's closed and drained. Closing from a receiver, or twice, panics → [[channel-axioms]].
  • Graceful drain = stop accepting new jobs, close the jobs channel, WaitGroup.Wait() until workers finish in-flight items. Pair with ctx.Done() in each worker's select so a hard shutdown can abort promptly → [[graceful-shutdown]].
  • Serialize the job payload at the boundary (JSON or protobuf bytes) so Redis stores opaque blobs; the worker deserializes back into a domain Job → [[serialization]].

💻 Code Examples

// internal/adapter/redisqueue — the real Queue port (go-redis is third-party,
// so this is a day-note snippet; the channel mechanics are runnable in stdlib).
type RedisQueue struct {
    rdb *redis.Client
    key string
}

func (q *RedisQueue) Push(ctx context.Context, j core.Job) error {
    b, err := json.Marshal(j)
    if err != nil {
        return err
    }
    return q.rdb.LPush(ctx, q.key, b).Err() // producer side
}

// Worker: reliable pop into a processing list, then ack by removing it.
func (q *RedisQueue) pop(ctx context.Context) (core.Job, error) {
    // BRPOPLPUSH src dst timeout: atomically move + return; blocks up to 5s.
    raw, err := q.rdb.BRPopLPush(ctx, q.key, q.key+":processing", 5*time.Second).Result()
    if err != nil {
        return core.Job{}, err // redis.Nil on timeout
    }
    var j core.Job
    return j, json.Unmarshal([]byte(raw), &j)
}

var _ core.Queue = (*RedisQueue)(nil) // compile-time port check

The in-process side is fully runnable in the stdlib: a buffered-channel queue with backpressure (examples/month-05/jobqueue) and a context-aware worker pool with graceful drain (examples/month-05/workerpool). Run: go run ./examples/month-05/jobqueue · go run ./examples/month-05/workerpool

🏋️ Exercises / Practice

Exercise Status Link
Buffered-channel queue with backpressure + drain examples/month-05/jobqueue
Worker pool with context-driven shutdown examples/month-05/workerpool

🐛 Mistakes Made

  • Closed the jobs channel from a worker to "signal done" → panic (close of closed channel) when a second worker also closed it. Only the producer closes, once.
  • Used plain RPOP and lost a job when a worker panicked mid-process. Switched to BRPOPLPUSH into a processing list so the job is recoverable.

❓ Open Questions

  • How do I reclaim stuck jobs left in the :processing list after a crash — a reaper that re-queues entries older than a TTL?

🧠 Active Recall (answer without looking)

  1. Q: Why is BRPOP alone "at-most-once," and what makes BRPOPLPUSH more reliable?
A`BRPOP` removes the job from the list the instant it's popped; if the worker dies before finishing, the job is gone. `BRPOPLPUSH` (or `LMOVE`) atomically moves the job to a processing list, so a crash leaves it there to be reclaimed; the worker deletes it only after success (an ack).
  1. Q: In the Go worker pool, what provides backpressure and who is allowed to close the jobs channel?
AA *buffered* channel provides backpressure: when it's full, the producer's send blocks until a worker frees a slot. Only the producer (channel owner) closes it, exactly once; workers `range` over it and exit on close. Closing from a receiver or twice panics.

🪶 Feynman Reflection

The queue is a conveyor belt. Producers drop boxes on one end (LPUSH); workers grab boxes off the other end (BRPOP). If the belt is full, the producer has to wait — that waiting is backpressure, and it keeps the system from drowning. To avoid losing a box if a worker trips, we slide it onto a side table (:processing) while it's handled and only throw away the record once the work is done.

🕳️ Knowledge Gaps

  • Tuning pool size vs Redis round-trip latency, and whether to pipeline pops.

✅ Summary

I can back the Queue port with a Redis list, choose the reliable BRPOPLPUSH variant for at-least-once delivery, and drain it with a bounded, context-aware worker pool that applies backpressure and shuts down gracefully.

⏭️ Next Steps / Prep for Tomorrow

  • Day 137: make processing resilient — exponential backoff, retry budgets, and a dead-letter queue.

Time spent Difficulty Confidence
90 min 🟦🟦⬜⬜⬜ 🟦🟦🟦⬜⬜

Suggested commit: feat(queue): redis-backed job queue with worker pool (day 136)