Your single cron job worked fine for two years. Then you added a second app server for redundancy, and now every job runs twice. Or you added auto-scaling and now the same billing email goes out to every customer five times on the first of the month. You know what I’m talking about.
The naive fix — "just pick one server to run crons" — trades correctness for a single point of failure. The smart fix involves three things: deciding who runs the job (leader election), deciding what to do when it fails (retry strategy), and guaranteeing it only counts once (deduplication). Get these three right and you have a scheduler that survives node failures, network partitions, and 3 AM on-call pages.
This article is the one I wish existed when I first ran a distributed system at scale. No hand-waving. Real code. Real gotchas.
The Problem You Actually Have
In a single-node world, cron is fine. On multiple nodes, every node runs its own crontab. Suddenly you have N workers all trying to fire the same send_invoices task at 00:00 UTC. Three things go wrong:
- Double execution: the job runs N times, producing duplicate records, duplicate emails, duplicate charges.
- Split-brain retries: node A fails halfway, node B retries from scratch but doesn’t know A already committed half the work.
- No visibility: which node ran what, and did it succeed? Nobody knows.
The root cause is always the same: no shared coordination layer. Fix the coordination layer and the rest becomes manageable.
The Coordination Stack
You need a place where all nodes agree on truth. Three solid options:
| Store | Sweet spot | Caveat |
|---|---|---|
| Redis (+ Redlock or SETNX) | Low latency, simple TTL-based locks | Redis is CP only with proper cluster config; single-node Redis is not safe for strict leader election |
| PostgreSQL advisory locks | You already run Postgres | Locks die with the connection; needs a health loop |
| etcd / ZooKeeper | Hard guarantees, used by k8s | Operational overhead, overkill for most teams |
For 90% of backend services, PostgreSQL advisory locks + a heartbeat table is the right answer. You already have Postgres. You trust it. The ACID guarantees are built-in. I’ll use that here, with Redis as a fast distributed mutex for the job-level dedup.
Part 1: Leader Election
Leader election is the act of one node claiming the right to schedule jobs. Not run them — just decide which jobs get enqueued and when. This separates concerns cleanly.
The Heartbeat Table
-- migrations/001_scheduler_leader.sql
CREATE TABLE scheduler_leader (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1), -- enforces single row
node_id TEXT NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL
);
The CHECK (id = 1) constraint is an old trick: it guarantees this table has exactly one row, making atomic upsert safe.
The Election Loop (Python)
import os
import time
import socket
import psycopg2
from datetime import datetime, timedelta, timezone
NODE_ID = socket.gethostname()
LEASE_SECONDS = 30 # how long the lease lasts
RENEW_INTERVAL = 10 # how often the leader renews
def try_acquire_or_renew(conn) -> bool:
"""
Returns True if this node is (or became) the leader after this call.
Uses a single upsert — no separate SELECT, no TOCTOU window.
"""
now = datetime.now(timezone.utc)
expires = now + timedelta(seconds=LEASE_SECONDS)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO scheduler_leader (id, node_id, acquired_at, expires_at)
VALUES (1, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET node_id = EXCLUDED.node_id,
acquired_at = EXCLUDED.acquired_at,
expires_at = EXCLUDED.expires_at
-- Only steal or renew if: expired, OR we already own it
WHERE scheduler_leader.expires_at < now()
OR scheduler_leader.node_id = EXCLUDED.node_id
RETURNING node_id
""", (NODE_ID, now, expires))
row = cur.fetchone()
conn.commit()
return row is not None # None means another node owns a valid lease
def leader_loop(schedule_fn):
conn = psycopg2.connect(os.environ["DATABASE_URL"])
while True:
if try_acquire_or_renew(conn):
schedule_fn() # enqueue whatever jobs are due right now
time.sleep(RENEW_INTERVAL)
Gotcha #1: The WHERE clause in the ON CONFLICT DO UPDATE is critical. Without it, every node overwrites the leader on every tick. With it, a node can only claim leadership if the current lease is expired — or it’s renewing its own lease.
Gotcha #2: conn.commit() after every election attempt, not just on success. A failed transaction leaves locks and prevents the next attempt from seeing fresh data.
Gotcha #3: If the leader’s database connection drops, the lease expires after LEASE_SECONDS and a new node takes over. Set LEASE_SECONDS high enough to absorb transient blips (≥30s), but low enough that failover doesn’t take forever. I use 30s lease, 10s renewal in production.
Part 2: The Job Queue
The leader doesn’t run jobs — it enqueues them. Workers pull from the queue. This decouples scheduling latency from execution latency.
-- migrations/002_job_queue.sql
CREATE TYPE job_status AS ENUM ('pending', 'running', 'done', 'failed', 'dead');
CREATE TABLE job_queue (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
idempotency_key TEXT NOT NULL,
status job_status NOT NULL DEFAULT 'pending',
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 5,
run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
locked_until TIMESTAMPTZ,
locked_by TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
error TEXT
);
-- This index is the backbone of the worker's polling query
CREATE INDEX idx_job_queue_claimable
ON job_queue (run_at, status, locked_until)
WHERE status IN ('pending', 'failed');
CREATE UNIQUE INDEX idx_job_queue_idempotency
ON job_queue (idempotency_key)
WHERE status NOT IN ('done', 'dead');
The idempotency_key unique index does your dedup at the database layer — no application code needed to prevent duplicates for in-flight or pending jobs. Once a job reaches done or dead, the key is released and can be re-used for the next scheduled run.
Part 3: Worker — Claiming Jobs Safely
import time
import socket
from datetime import datetime, timedelta, timezone
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_TIMEOUT = 60 # seconds; must exceed your longest job
def claim_job(conn):
"""
Atomically claim one job. Returns the row dict or None.
FOR UPDATE SKIP LOCKED is the key — no thundering herd.
"""
now = datetime.now(timezone.utc)
lock_until = now + timedelta(seconds=LOCK_TIMEOUT)
with conn.cursor() as cur:
cur.execute("""
UPDATE job_queue
SET status = 'running',
locked_until = %s,
locked_by = %s,
attempts = attempts + 1,
updated_at = now()
WHERE id = (
SELECT id FROM job_queue
WHERE status IN ('pending', 'failed')
AND run_at <= now()
AND (locked_until IS NULL OR locked_until < now())
ORDER BY run_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *
""", (lock_until, WORKER_ID))
row = cur.fetchone()
conn.commit()
if row is None:
return None
cols = [desc[0] for desc in cur.description]
return dict(zip(cols, row))
FOR UPDATE SKIP LOCKED is not optional. Without it, every worker SELECT blocks on the same row, causing a thundering herd the moment jobs back up. With it, each worker atomically skips rows already claimed by another. This is available in PostgreSQL 9.5+ and MySQL 8+.
Gotcha #4: LOCK_TIMEOUT must be longer than your slowest job’s expected runtime. If not, your job runs to completion but another worker reclaims it halfway through because the lock expired. Set it conservatively, and implement a heartbeat if jobs can run longer than a few minutes.
Part 4: Retry Strategy
Not all failures are equal. A 500 from a downstream API deserves a retry. A ValidationError on your own data does not — retrying it just wastes queue slots.
import random
from datetime import datetime, timedelta, timezone
class RetryPolicy:
def __init__(self, max_attempts=5, base_delay=5, max_delay=300):
self.max_attempts = max_attempts
self.base_delay = base_delay # seconds
self.max_delay = max_delay # cap at 5 minutes
def next_run_at(self, attempts: int) -> datetime:
# Exponential backoff with full jitter
# Full jitter avoids synchronized retry storms across workers
cap = min(self.max_delay, self.base_delay * (2 ** attempts))
delay = random.uniform(0, cap)
return datetime.now(timezone.utc) + timedelta(seconds=delay)
def is_exhausted(self, attempts: int) -> bool:
return attempts >= self.max_attempts
DEFAULT_POLICY = RetryPolicy()
# Non-retriable exception marker
class PermanentFailure(Exception):
pass
def handle_job_result(conn, job: dict, error: Exception | None):
policy = RetryPolicy(max_attempts=job["max_attempts"])
if error is None:
_set_status(conn, job["id"], "done")
return
if isinstance(error, PermanentFailure) or policy.is_exhausted(job["attempts"]):
_set_status(conn, job["id"], "dead", str(error))
# optionally: send to dead-letter table, alert, etc.
return
next_run = policy.next_run_at(job["attempts"])
_set_status(conn, job["id"], "failed", str(error), next_run)
def _set_status(conn, job_id, status, error=None, run_at=None):
with conn.cursor() as cur:
cur.execute("""
UPDATE job_queue
SET status = %s,
error = %s,
run_at = COALESCE(%s, run_at),
locked_until = NULL,
locked_by = NULL,
updated_at = now()
WHERE id = %s
""", (status, error, run_at, job_id))
conn.commit()
Full jitter (not exponential backoff alone) is the right default. AWS published research on this in 2015 and it holds: without jitter, retries synchronize across workers and create a second wave of load exactly when your downstream is already struggling. random.uniform(0, cap) spreads the retry wave across the entire backoff window.
Gotcha #5: The dead status is your dead-letter queue. Don’t delete dead jobs — they’re forensic evidence. Ship them to a separate table or alert on them. Many production incidents are diagnosed entirely from dead-letter job payloads.
Part 5: Deduplication
Dedup has two layers in this architecture:
Layer 1 — Database-level (free): the UNIQUE INDEX on idempotency_key prevents the same job from being enqueued twice while it’s pending or running.
def enqueue_job(conn, job_type: str, payload: dict, idempotency_key: str,
run_at=None, max_attempts=5):
run_at = run_at or datetime.now(timezone.utc)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO job_queue
(job_type, payload, idempotency_key, run_at, max_attempts)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (idempotency_key)
WHERE status NOT IN ('done', 'dead')
DO NOTHING
RETURNING id
""", (job_type, psycopg2.extras.Json(payload),
idempotency_key, run_at, max_attempts))
row = cur.fetchone()
conn.commit()
return row[0] if row else None # None = already enqueued, not an error
Layer 2 — Job handler idempotency: your job handler must be safe to run twice anyway. Network partitions happen. The lock can expire mid-job. Design for at-least-once and build at-most-once behavior inside the handler.
def send_monthly_invoice(payload: dict):
invoice_id = payload["invoice_id"]
user_id = payload["user_id"]
# Check if we already sent this — idempotency at the domain level
with conn.cursor() as cur:
cur.execute("""
SELECT sent_at FROM invoices
WHERE id = %s AND sent_at IS NOT NULL
""", (invoice_id,))
if cur.fetchone():
return # already sent, skip silently
# ... send the email ...
# ... mark as sent atomically with the send ...
cur.execute("UPDATE invoices SET sent_at = now() WHERE id = %s", (invoice_id,))
conn.commit()
The pattern: check, then act, then mark — all in one transaction if possible. If your email provider doesn’t support transactional sends, use an outbox table and flush it in a separate step.
Docker Compose: The Full Stack
# docker-compose.yml
version: "3.9"
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: scheduler
POSTGRES_USER: scheduler
POSTGRES_PASSWORD: secret
volumes:
- pgdata:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD-SHELL", "pg_isready -U scheduler"]
interval: 5s
timeout: 3s
retries: 10
redis:
image: redis:7-alpine
command: redis-server --save 60 1 --loglevel warning
volumes:
- redisdata:/data
scheduler:
build: .
command: python -m app.scheduler # leader election loop
environment:
DATABASE_URL: postgresql://scheduler:secret@postgres/scheduler
REDIS_URL: redis://redis:6379/0
NODE_ID: "scheduler-1"
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
# Scale this to 3 replicas — only one becomes leader
deploy:
replicas: 3
worker:
build: .
command: python -m app.worker # job execution loop
environment:
DATABASE_URL: postgresql://scheduler:secret@postgres/scheduler
REDIS_URL: redis://redis:6379/0
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
deploy:
replicas: 5 # scale workers independently of schedulers
volumes:
pgdata:
redisdata:
Notice scheduler scales to 3 replicas but only one holds the leader lease at any moment. The other two are hot standbys — they win the election within LEASE_SECONDS seconds if the current leader dies. Workers are completely stateless and scale freely.
Production Gotchas Roundup
Clock drift kills you. Leader election based on expires_at < now() compares timestamps across different nodes. If your clocks drift by more than a few seconds, you get split-brain: two nodes both think they’re leader simultaneously. Run chrony or systemd-timesyncd, and keep your lease window wide enough to absorb 2-3 seconds of drift.
The "stale worker" problem. A worker claims a job, then gets paused by a GC pause or a container freeze for 90 seconds. The lock expires. Another worker claims the same job and runs it. The first worker wakes up and also runs it. Your domain-level idempotency check is the last line of defense here, not the lock. Never trust the lock alone.
Poison pills. Some jobs crash workers reliably and loop forever through your retry queue. Cap max_attempts per job type, not globally. A send_email job that fails 5 times should go dead. A data pipeline job that’s been running for 30 years might deserve 20 attempts.
Observability. Add job_type, status, attempts, and locked_by as labels to your metrics. The query you’ll run at 3 AM is: "how many jobs are currently running, per job_type, owned by workers that haven’t updated locked_until in the last 2 minutes?" That’s your stuck-job detector. Build it before the incident, not during.
Don’t poll at 100ms. Workers polling the job table at high frequency hammers Postgres with empty queries during quiet periods. Use pg_notify / LISTEN / NOTIFY to wake workers when a new job is inserted, and fall back to a 5s poll as a safety net. Your DBA will thank you.
Putting a Number on It
This architecture handles ~10k job enqueues/sec on a Postgres instance with 16 vCPUs, assuming your jobs themselves aren’t CPU-bound on the DB. Bottlenecks appear in the index on job_queue first — partition that table by status or created_at once you hit tens of millions of rows. For higher throughput, drop the job queue into a proper message broker (NATS JetStream, RabbitMQ, Kafka) and keep Postgres only for leader election and dedup tracking. The coordination patterns stay identical.
The full working example — migrations, scheduler, worker, Docker Compose, tests — lives at github.com/example/distributed-job-scheduler. Copy it, break it, build on it.