Go back

The Nudge Pattern: Brokerless Background Jobs in Python

Codes at: https://github.com/CynicDog/outbox-worker-in-python

Most background job services eventually reach the same crossroads: do you reach for a message broker (Kafka, RabbitMQ, Redis Streams), or do you try to make the database you already have work as a queue? The broker route brings powerful guarantees but also real operational weight — another system to deploy, monitor, upgrade, and reason about under failure. The database route looks tempting but often gets dismissed as naive.

This post is about a third path. One that uses the database as the authoritative job queue, keeps operational complexity at zero, and still delivers millisecond pickup latency for interactive workloads — without a broker in sight. The key ingredient is a pattern I call the Nudge.

The Two Concurrency Problems

Before getting into the pattern, it’s worth naming the two distinct concurrency problems a background job service must solve simultaneously.

Problem 1 — The coordination problem. How does the service know when new work has arrived? It can poll the DB on a timer, but polling introduces latency proportional to the poll interval. It can wait to be told, but then correctness depends on the notification always arriving. Neither alone is good enough.

Problem 2 — The execution problem. How does the service process many jobs in parallel without starving the HTTP server? If jobs block the event loop, no new requests can land. If you spawn a process per job, startup cost kills latency. The right model depends on what the jobs actually do.

These two problems call for different concurrency primitives, and it turns out Python gives us exactly the right tools for each.

Two Layers, Two Models

The engine runs two distinct concurrency models side by side:

Diagram

The asyncio loop handles everything that is fast and non-blocking: incoming HTTP, timers, outbound HTTP to claim jobs from the DB. The thread pool handles everything that blocks: reading files, running CPU-bound transforms, writing results back.

The bridge between them is a single call — executor.submit(fn, job) — and that is where the async world hands work off to the thread world without either blocking the other.

Why Threads Instead of Coroutines for the Heavy Work?

This is the question that surprises people. If we’re already using asyncio, why not asyncio.create_task() for jobs too?

The answer is the GIL — and more specifically, how Polars and PyArrow interact with it.

Most Python code holds the GIL continuously. But C-extension libraries that do heavy numeric or I/O work can release it. PyArrow releases the GIL for all Parquet I/O. Polars releases it for most DataFrame operations because its core is written in Rust. This means that when a worker thread is deep inside a Polars aggregation or a PyArrow column scan, other Python threads — including other worker threads — can run simultaneously. True parallelism, not just concurrency.

Contrast this with asyncio.create_task(): coroutines run on the single event loop thread. Even if a coroutine awaits, the CPU-bound work inside Polars cannot run on another core because there is no other thread. You’d need loop.run_in_executor() anyway, which is exactly what executor.submit() gives you, minus the async boilerplate.

Threads here are not a workaround for not knowing asyncio. They are the correct tool for GIL-releasing CPU-bound work. The event loop and the thread pool are collaborators, not competitors.

The Database as a Job Queue

The job queue is not Redis. It’s not Kafka. It’s the relational database the service already uses for everything else — a plain table with a status column.

CREATE TABLE job (
    id          TEXT PRIMARY KEY,
    status      TEXT NOT NULL DEFAULT 'PENDING',
    version     INTEGER NOT NULL DEFAULT 0,
    claimed_by  TEXT,
    claimed_at  TIMESTAMP,
    payload     TEXT,
    result      TEXT,
    retry_count INTEGER NOT NULL DEFAULT 0
);

When the caller wants work done, it writes a row (status='PENDING') and returns. The engine’s job is to find PENDING rows, claim them atomically, and execute them. The DB is the source of truth. If the engine crashes mid-job, the row outlives the crash. Nothing is lost.

The critical piece is claiming atomically. Two engine instances must not claim the same job. The version column enables optimistic locking — any ORM or query builder that supports conditional updates can express this, and in raw SQL it looks like this:

UPDATE job
   SET status = 'RUNNING',
       version = version + 1,
       claimed_by = :worker_id,
       claimed_at = :now
 WHERE id = :id
   AND status = 'PENDING'
   AND version = :expected_version

If rowcount == 0, another worker got there first — skip it silently. No explicit locks, no queue middleware, no coordination protocol beyond what the database already guarantees.

The Poll Loop: Correctness Without a Broker

With the DB as queue, correctness is achieved by polling:

# worker/poller.py
async def poll_loop() -> None:
    while True:
        await asyncio.sleep(cfg.POLL_INTERVAL)
        await claim_and_dispatch()

async def claim_and_dispatch() -> None:
    available = pool.max_workers() - pool.active_count()
    if available <= 0:
        return
    jobs = await store.claim_pending(cfg.WORKER_ID, available)
    for job in jobs:
        runner.submit(job)

poll_loop() runs as a background asyncio task started at application startup:

@asynccontextmanager
async def lifespan(app: FastAPI):
    asyncio.create_task(poll_loop())
    yield
    pool.shutdown(wait=False)

asyncio.create_task() registers the coroutine with the event loop and returns immediately. The loop runs poll_loop() cooperatively — every await asyncio.sleep(...) yields control back, so HTTP handlers are never starved.

claim_and_dispatch() checks available slots before calling the DB, so it never over-claims. It only asks for as many jobs as there are free threads. This prevents the engine from piling up more work than it can run.

The poll loop is the correctness guarantee. Even if every other delivery mechanism fails, a PENDING job will be picked up within POLL_INTERVAL seconds.

The Nudge: Latency Without Sacrificing Correctness

A poll interval of 10 seconds is fine for recovery, but terrible for interactive flows. When a user clicks “Run,” they do not want to wait 10 seconds before anything starts.

This is where the Nudge comes in.

The caller — after writing the PENDING row to the DB — sends a lightweight HTTP signal:

POST /nudge

The nudge handler does exactly one thing:

# api/routes.py
@router.post("/nudge")
async def nudge():
    asyncio.create_task(claim_and_dispatch())
    return {"ok": True}

It fires claim_and_dispatch() as a fire-and-forget asyncio task and returns 200 immediately — without waiting for the claim round-trip. The caller does not block. The claim happens asynchronously in the background, typically within milliseconds.

The key insight is that claim_and_dispatch() is shared by both the timer and the nudge. The nudge does not add a separate code path — it just fires the same claim logic right now, rather than waiting for the next poll cycle.

The nudge is not a guarantee — it can fail, the engine could be restarting, the pool could be full. None of that matters, because the poll loop is always running. The nudge is purely a latency optimisation. Correctness comes from the poll loop. Speed comes from the nudge.

The Full Flow: User Clicks a Button

Here is what happens end-to-end when a user triggers a job from the UI:

Diagram

The user gets an immediate 202. The job runs in a thread. The user polls for completion. The event loop is never blocked.

Slot Tracking: Never Over-Commit the Pool

ThreadPoolExecutor itself doesn’t tell you how many slots are in use before you try to submit. So the engine maintains its own counter:

# worker/pool.py
_active: dict[str, bool] = {}

def acquire(job_id: str) -> None:
    _active[job_id] = True

def release(job_id: str) -> None:
    _active.pop(job_id, None)

def active_count() -> int:
    return len(_active)

acquire() is called before executor.submit(). release() is called in the finally block of every worker thread:

# worker/runner.py
def submit(job) -> None:
    pool.acquire(job.id)
    executor.submit(_run_job, job)

def _run_job(job) -> None:
    try:
        result = transform(job.payload)
        store.complete(job.id, result)
    except Exception as exc:
        store.fail(job.id, str(exc))
    finally:
        pool.release(job.id)   # always runs, even on crash

This means _active correctly reflects in-flight jobs in the instant between submit() and the thread actually starting. claim_and_dispatch() reads active_count() before calling the DB, so it never claims more jobs than available threads.

Stuck Job Recovery

If a worker process is killed mid-job, the row stays RUNNING forever — nothing ever marks it otherwise. The fix is a periodic sweep that looks for jobs that have been running longer than a reasonable threshold and resets them to PENDING so the poll loop picks them up again. Jobs that keep failing past a retry limit get marked FAILED and are left alone.

Recovery deliberately does not send a nudge. It relies entirely on poll_loop — and that is fine, because recovery is not latency-sensitive. A job that was already stuck for thirty minutes can wait another ten seconds.

What This Achieves (and What It Doesn’t)

Durability is covered by the DB write itself — the PENDING row exists before the nudge is ever sent, so a crash between the two loses nothing. Low latency comes from the wake signal firing claim_and_dispatch() as a background task the moment the row is written, getting a thread started within milliseconds. No broker is the point — the RDBMS you already own is the queue, with no Kafka, Redis, or RabbitMQ to operate. Multi-instance safety is handled by SELECT FOR UPDATE SKIP LOCKED, which ensures two workers racing to claim the same row will each get a different one. Backpressure is natural — slot tracking prevents over-claiming, so excess PENDING jobs simply wait for the next poll cycle rather than overwhelming the pool. Recovery is continuous — poll_loop sweeps every few seconds and picks up anything that was missed or reset from a stuck state.

What this pattern does not give you: push-based fan-out to multiple consumer types, replayable event streams, dead-letter queues with complex routing, or the observability tooling that comes with mature broker ecosystems. If you need those, a broker is the right call. But for a single-consumer background job service that already owns a relational DB and wants minimal operational surface, the pattern covers the requirements with nothing extra to operate.


Share this post on:

Next Post
Rust says Hi to Python!