cat blog/.md
Outbox Pattern: đảm bảo consistency giữa database và message broker
Trong hệ thống microservices, có một đoạn code mà hầu như ai cũng đã từng viết:
async def create_order(payload: OrderCreate) -> Order:
order = await order_repo.create(payload)
await publisher.publish("order.created", order.to_event())
return order
Nhìn thì hợp lý. Tạo order xong thì publish event để các service khác (notification, inventory, analytics) cùng biết. Nhưng đoạn code trên có một bug âm thầm mà thường chỉ lộ ra ở production, lúc 3 giờ sáng.
Hai transaction, một vấn đề
Bạn đang thao tác trên hai hệ thống khác nhau: PostgreSQL và message broker (RabbitMQ, SNS, Kafka…). Không có transaction nào bao trùm cả hai. Vậy nên sẽ có 4 kịch bản:
- DB commit thành công, publish thành công → OK
- DB commit fail, publish không chạy → OK (state chưa thay đổi)
- DB commit thành công, publish fail → DB có order, nhưng không service nào biết
- DB commit thành công, publish thành công, nhưng response về client lỗi → client retry → duplicate event
Kịch bản 3 là kịch bản kinh điển: order tồn tại, tiền đã trừ, nhưng notification service không gửi email, inventory không trừ kho. Mất consistency trong im lặng.
Đảo ngược thứ tự cũng không cứu được:
# Cũng sai — nhưng ngược lại
await publisher.publish("order.created", event)
await order_repo.create(payload) # nếu fail → event đã gửi nhưng order không tồn tại
Đây là dual-write problem. Và Outbox Pattern là một giải pháp đã được kiểm chứng.
Ý tưởng Outbox
Thay vì publish trực tiếp ra broker, ghi event vào cùng database với dữ liệu nghiệp vụ, trong cùng một transaction. Sau đó có một worker riêng biệt đọc bảng này và publish ra broker.
┌─────────────────┐ ┌──────────────────┐
│ API request │ │ Outbox Worker │
│ │ │ (background) │
│ BEGIN │ │ │
│ INSERT order │ │ SELECT unsent │
│ INSERT outbox │ │ publish │
│ COMMIT │ │ UPDATE sent_at │
└────────┬────────┘ └────────┬─────────┘
│ │
▼ ▼
┌─────────────────────────────────────┐
│ PostgreSQL │
│ orders │ outbox_events │
└─────────────────────────────────────┘
│
▼
┌──────────────┐
│ Message │
│ Broker │
└──────────────┘
Vì INSERT order và INSERT outbox nằm trong cùng một transaction, hoặc cả hai cùng commit, hoặc cả hai cùng rollback. Không còn kịch bản 3.
Worker publish ra broker có thể fail, nhưng event vẫn nằm yên trong bảng outbox. Worker chỉ cần retry lần sau là xong.
Triển khai với PostgreSQL + FastAPI
Bảng outbox
CREATE TABLE outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL, -- 'order', 'payment', ...
aggregate_id VARCHAR(64) NOT NULL, -- order_id
event_type VARCHAR(128) NOT NULL, -- 'order.created'
payload JSONB NOT NULL,
headers JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = chưa publish
attempt_count INT NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index quan trọng: worker query unsent events
CREATE INDEX idx_outbox_unpublished
ON outbox_events (created_at)
WHERE published_at IS NULL;
Index partial chỉ gồm row chưa publish — khi bảng đầy hàng triệu event đã publish, query của worker vẫn nhanh.
Service layer — ghi outbox cùng business data
from sqlalchemy.ext.asyncio import AsyncSession
async def create_order(
session: AsyncSession,
payload: OrderCreate,
) -> Order:
order = Order(**payload.model_dump())
session.add(order)
await session.flush() # lấy order.id
event = OutboxEvent(
aggregate_type="order",
aggregate_id=str(order.id),
event_type="order.created",
payload={
"order_id": order.id,
"buyer_id": order.buyer_id,
"total": str(order.total),
"created_at": order.created_at.isoformat(),
},
)
session.add(event)
await session.commit() # cả hai cùng commit hoặc cùng rollback
return order
Không gọi publisher.publish() ở đây. Router cũng không gọi. Đơn giản là ghi vào DB rồi trả về client.
Worker — publish và đánh dấu sent
Worker là một process riêng, chạy loop:
import asyncio
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
BATCH_SIZE = 100
POLL_INTERVAL = 1.0 # giây
async def publish_outbox_batch(session: AsyncSession, publisher) -> int:
# SKIP LOCKED cho phép nhiều worker chạy song song không tranh row
stmt = (
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.id)
.limit(BATCH_SIZE)
.with_for_update(skip_locked=True)
)
result = await session.execute(stmt)
events = result.scalars().all()
if not events:
return 0
for event in events:
try:
await publisher.publish(
topic=event.event_type,
payload=event.payload,
headers={
"event_id": str(event.id), # idempotency key cho consumer
"aggregate_id": event.aggregate_id,
},
)
event.published_at = func.now()
except Exception as exc:
event.attempt_count += 1
event.last_error = str(exc)[:500]
# không raise — các event khác trong batch vẫn được xử lý
await session.commit()
return len(events)
async def worker_loop(session_factory, publisher):
while True:
try:
async with session_factory() as session:
processed = await publish_outbox_batch(session, publisher)
if processed == 0:
await asyncio.sleep(POLL_INTERVAL)
except Exception:
logger.exception("Outbox worker error")
await asyncio.sleep(POLL_INTERVAL * 5)
Vài điểm đáng chú ý:
SKIP LOCKED: nếu bạn chạy nhiều instance worker (để HA), mỗi instance lấy row khác nhau, không block nhau. Đây là tính năng bắt buộc phải có của PostgreSQL trong scenario này.- Không raise trong vòng lặp: một event fail không làm sập cả batch. Row đó vẫn
published_at = NULL, lần poll sau sẽ retry. event_idtrong header: consumer dùng id này làm idempotency key để dedupe.
”At-least-once” — không phải “exactly-once”
Outbox pattern đảm bảo at-least-once delivery. Nghĩa là mỗi event chắc chắn được publish ít nhất một lần, nhưng có thể bị publish nhiều hơn một lần khi:
- Worker publish xong, chưa kịp
UPDATE published_atthì crash - Network flap làm broker ack không về tới worker
Lần sau worker chạy lại, nó thấy event chưa sent và publish lại. Consumer nhận cùng một event 2 lần.
Vì vậy consumer phải idempotent. Pattern thường dùng:
async def handle_order_created(event: dict, headers: dict):
event_id = headers["event_id"]
# Kiểm tra đã xử lý chưa
if await processed_events_repo.exists(event_id):
logger.info("Event already processed, skipping", event_id=event_id)
return
async with session.begin():
# Xử lý business logic
await do_something(event)
# Ghi nhận đã xử lý
await processed_events_repo.create(event_id)
Bảng processed_events đơn giản gồm event_id (PK) + processed_at. Nếu INSERT duplicate → skip.
Dọn dẹp bảng outbox
Bảng outbox sẽ to dần. Có vài cách xử lý:
- Xóa sau khi publish: đơn giản nhất, nhưng mất khả năng audit/replay.
- Giữ lại 7–30 ngày: cron job xóa row có
published_at < NOW() - INTERVAL '7 days'. - Archive sang cold storage: dump ra S3 theo ngày, xóa khỏi bảng.
Mình thường chọn option 2 — 30 ngày đủ để replay khi cần, và bảng vẫn gọn.
DELETE FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '30 days';
Khi nào không nên dùng Outbox
- Latency cực thấp: outbox thêm độ trễ vài trăm ms đến vài giây tùy polling interval. Nếu event cần realtime (ví dụ trading), cân nhắc transactional outbox với CDC (Debezium đọc WAL).
- Throughput cực cao: ghi mỗi event ra DB có thể là bottleneck. Lúc đó cần shard bảng outbox hoặc dùng Kafka Transactions.
- Hệ thống đơn giản, chỉ 1 service: nếu không có cross-service event, outbox là overkill.
Với phần lớn B2B, SaaS, e-commerce bình thường (hàng trăm đến vài nghìn event/giây), outbox là lựa chọn mặc định tốt.
Tổng kết
Dual-write giữa DB và message broker là cái bẫy mà hầu hết dev đều bước vào ít nhất một lần. Outbox pattern giải quyết nó bằng một ý tưởng đơn giản: chỉ ghi vào một nơi — database — và để một worker biến state đó thành event.
Checklist triển khai:
- Bảng
outbox_eventsvới index partial trênpublished_at IS NULL - Service layer ghi event vào outbox trong cùng transaction với business data
- Worker dùng
SELECT ... FOR UPDATE SKIP LOCKEDđể publish - Consumer idempotent bằng
event_id - Cron job cleanup outbox đã publish cũ hơn N ngày
- Monitor: số event
published_at IS NULLlâu hơn X giây → alert
Không có “exactly-once” trong distributed systems, chỉ có “at-least-once + idempotent”. Outbox cho bạn đúng điều đó, với chi phí một bảng DB và một worker nhỏ.
cat comments.log