cat blog/.md
SELECT FOR UPDATE SKIP LOCKED: biến PostgreSQL thành job queue
Có một thứ tôi thấy rất nhiều team Việt Nam quá vội thêm vào stack: message broker. Mới chỉ có vài chục job/phút mà đã kéo Redis + Bull, hoặc RabbitMQ, hoặc thậm chí cả Kafka. Thêm một component nghĩa là thêm một thứ phải monitor, backup, scale, debug khi nó down. Và 80% trường hợp, Postgres đã đủ.
Vũ khí làm điều đó là SELECT ... FOR UPDATE SKIP LOCKED — một mệnh đề có từ Postgres 9.5 (2016) nhưng ít người dùng đến. Khi hiểu nó, bạn có thể xây một job queue đơn giản, atomic, không mất job, không cần thêm gì ngoài cái DB sẵn có.
Vấn đề kinh điển: nhiều worker, cùng một bảng
Tưởng tượng bảng jobs:
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending | running | done | failed
created_at TIMESTAMPTZ DEFAULT now(),
picked_at TIMESTAMPTZ
);
Bạn chạy 5 worker, mỗi worker poll mỗi giây:
-- Worker poll: NAIVE version
SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 1;
-- Sau đó:
UPDATE jobs SET status = 'running', picked_at = now() WHERE id = ?;
Vấn đề lộ ra ngay: cả 5 worker cùng SELECT về cùng một row, cùng UPDATE, và 4 worker chạy redundant. Job bị chạy 5 lần. Nếu job là gửi email, khách hàng nhận 5 email trùng.
Hướng fix bản năng: thêm FOR UPDATE:
SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 1 FOR UPDATE;
FOR UPDATE lock row đó cho đến hết transaction. Vấn đề: 4 worker còn lại chờ cho đến khi worker đầu commit. Polling 5 worker biến thành 5 worker đứng xếp hàng pick từng job. Throughput xuống còn 1/5.
SKIP LOCKED đến cứu
Cú pháp đầy đủ:
SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;
SKIP LOCKED bảo Postgres: “Nếu row đang bị worker khác lock, đừng chờ — bỏ qua, lấy row kế tiếp”. Mỗi worker nhận một row riêng, không xung đột, không chờ. Đây chính là semantic của một queue đúng nghĩa.
Flow đầy đủ trong một transaction:
BEGIN;
SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;
UPDATE jobs
SET status = 'running', picked_at = now()
WHERE id = $1;
COMMIT;
Hoặc gộp lại bằng CTE — cách tôi hay dùng vì nó atomic trong một query:
WITH next_job AS (
SELECT id
FROM jobs
WHERE status = 'pending'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'running', picked_at = now()
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.id, jobs.payload;
Một câu, một transaction implicit, không có race condition. Worker code Python tương ứng:
async def claim_next_job(conn) -> Job | None:
row = await conn.fetchrow("""
WITH next_job AS (
SELECT id FROM jobs
WHERE status = 'pending'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'running', picked_at = now()
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.id, jobs.payload
""")
return Job(**row) if row else None
Crash recovery — chỗ nhiều người quên
Worker pick job, status thành running, rồi worker bị OOM kill. Job mắc kẹt ở running mãi mãi. Đây là điểm yếu của naive approach — và là lý do người ta nghĩ Postgres không làm queue được.
Fix bằng visibility timeout: nếu job ở trạng thái running quá lâu mà chưa done, coi như worker đã chết, cho phép pick lại.
WITH next_job AS (
SELECT id FROM jobs
WHERE (status = 'pending')
OR (status = 'running' AND picked_at < now() - INTERVAL '5 minutes')
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'running', picked_at = now()
FROM next_job
WHERE jobs.id = next_job.id
RETURNING jobs.id, jobs.payload;
Lưu ý: cách này cho semantic at-least-once. Job có thể chạy 2 lần nếu worker đầu lăn đùng ra ngay trước khi commit done. Handler phải idempotent — nhưng đó là rule chung cho mọi job queue, không riêng gì Postgres (xem bài Idempotency Key).
Retry và dead letter
Một bảng jobs “đủ xài” cần thêm vài cột:
ALTER TABLE jobs
ADD COLUMN attempts INT DEFAULT 0,
ADD COLUMN max_attempts INT DEFAULT 3,
ADD COLUMN run_after TIMESTAMPTZ DEFAULT now(),
ADD COLUMN last_error TEXT;
Khi handler raise exception:
async def fail_job(conn, job_id: int, err: str):
await conn.execute("""
UPDATE jobs
SET status = CASE
WHEN attempts + 1 >= max_attempts THEN 'failed'
ELSE 'pending'
END,
attempts = attempts + 1,
last_error = $2,
run_after = now() + (interval '1 minute' * power(2, attempts))
WHERE id = $1
""", job_id, err)
Backoff 2^attempts phút: 1m, 2m, 4m. Sau max_attempts lần, job thành failed — đó là dead letter queue. Query pending cũng phải tôn trọng run_after:
WHERE (status = 'pending' AND run_after <= now())
OR (status = 'running' AND picked_at < now() - INTERVAL '5 minutes')
Index — đừng quên
Query trên scan bảng theo status và run_after. Không có index thì khi jobs lên vài triệu row, mỗi lần poll mất vài trăm ms và worker đè CPU của Postgres.
CREATE INDEX idx_jobs_pending
ON jobs (run_after, id)
WHERE status = 'pending';
CREATE INDEX idx_jobs_running
ON jobs (picked_at)
WHERE status = 'running';
Partial index — chỉ chứa row của trạng thái cần, nhỏ và nhanh. Khi job xong và status thành done/failed, row biến mất khỏi index. Đây cũng là một trong những lý do tôi thường archive job cũ sang bảng riêng định kỳ (jobs_archive) để index nhỏ gọn.
Notify worker thay vì polling 1 giây
Polling mỗi giây hoạt động ổn nhưng có độ trễ cố định. Postgres có LISTEN/NOTIFY để worker được đánh thức ngay khi có job mới:
-- Khi enqueue:
INSERT INTO jobs (payload) VALUES ($1);
NOTIFY jobs_new;
# Worker:
await conn.execute("LISTEN jobs_new")
while True:
job = await claim_next_job(conn)
if job:
await handle(job)
else:
# Chờ NOTIFY tối đa 30s, hoặc poll lại
await conn.wait_for_notify(timeout=30)
Kết hợp lại: NOTIFY cho latency thấp lúc bình thường, fallback polling 30s phòng khi NOTIFY rớt (vì LISTEN/NOTIFY không persist — nếu connection rớt giữa lúc NOTIFY bay, message biến mất).
Khi nào nên — và không nên — dùng cách này
Hợp:
- Throughput < vài nghìn job/giây.
- Job và business data cùng nằm trong một DB → enqueue có thể chạy chung transaction với business write (chỉ cần INSERT, không phải lo two-phase commit giữa DB và broker — xem bài Outbox Pattern).
- Team nhỏ, không muốn thêm component mới vào stack ops.
- Cần truy vấn job dễ dàng bằng SQL: “job nào fail nhiều nhất tuần qua?” — chỉ là một query thay vì phải dựng dashboard riêng cho RabbitMQ.
Không hợp:
- Throughput cực cao (> 10K msg/s).
- Cần pub/sub fan-out cho nhiều consumer độc lập (Postgres không phải broker, dùng Kafka).
- Cần ordering guarantees phức tạp giữa nhiều partition.
- DB đang đã quá tải — đẩy thêm queue traffic vào sẽ làm vấn đề nặng hơn.
Stripe, GitLab, Sidekiq Pro đều có production system chạy queue trên Postgres ở quy mô lớn. Riêng Sidekiq vừa thêm tier “Sidekiq Iron” 2024 dùng Postgres làm backend chính. Nên đừng nghĩ “Postgres queue chỉ là đồ chơi”.
Tóm lại
Trước khi reach Redis/RabbitMQ, hãy thử Postgres + SKIP LOCKED. Một bảng, vài chục dòng code, ít thứ phải vận hành hơn. Khi traffic lớn lên thật và lý do rõ ràng, bạn vẫn migrate được — nhưng đừng over-engineer từ ngày đầu.
Một câu tôi tự nhắc khi thiết kế: “Bạn đã có một DB rồi. Dùng nó.”
cat comments.log