Outbox Pattern: đảm bảo consistency giữa database và message broker — hoatq.dev

cat blog/.md

Outbox Pattern: đảm bảo consistency giữa database và message broker

date: tags: backend, microservices, postgresql, design-pattern, event-driven

Trong hệ thống microservices, có một đoạn code mà hầu như ai cũng đã từng viết:

async def create_order(payload: OrderCreate) -> Order:
    order = await order_repo.create(payload)
    await publisher.publish("order.created", order.to_event())
    return order

Nhìn thì hợp lý. Tạo order xong thì publish event để các service khác (notification, inventory, analytics) cùng biết. Nhưng đoạn code trên có một bug âm thầm mà thường chỉ lộ ra ở production, lúc 3 giờ sáng.

Hai transaction, một vấn đề

Bạn đang thao tác trên hai hệ thống khác nhau: PostgreSQL và message broker (RabbitMQ, SNS, Kafka…). Không có transaction nào bao trùm cả hai. Vậy nên sẽ có 4 kịch bản:

  1. DB commit thành công, publish thành công → OK
  2. DB commit fail, publish không chạy → OK (state chưa thay đổi)
  3. DB commit thành công, publish fail → DB có order, nhưng không service nào biết
  4. DB commit thành công, publish thành công, nhưng response về client lỗi → client retry → duplicate event

Kịch bản 3 là kịch bản kinh điển: order tồn tại, tiền đã trừ, nhưng notification service không gửi email, inventory không trừ kho. Mất consistency trong im lặng.

Đảo ngược thứ tự cũng không cứu được:

# Cũng sai — nhưng ngược lại
await publisher.publish("order.created", event)
await order_repo.create(payload)  # nếu fail → event đã gửi nhưng order không tồn tại

Đây là dual-write problem. Và Outbox Pattern là một giải pháp đã được kiểm chứng.

Ý tưởng Outbox

Thay vì publish trực tiếp ra broker, ghi event vào cùng database với dữ liệu nghiệp vụ, trong cùng một transaction. Sau đó có một worker riêng biệt đọc bảng này và publish ra broker.

┌─────────────────┐         ┌──────────────────┐
│  API request    │         │  Outbox Worker   │
│                 │         │  (background)    │
│  BEGIN          │         │                  │
│  INSERT order   │         │  SELECT unsent   │
│  INSERT outbox  │         │  publish         │
│  COMMIT         │         │  UPDATE sent_at  │
└────────┬────────┘         └────────┬─────────┘
         │                           │
         ▼                           ▼
    ┌─────────────────────────────────────┐
    │         PostgreSQL                  │
    │  orders │ outbox_events             │
    └─────────────────────────────────────┘


                              ┌──────────────┐
                              │ Message      │
                              │ Broker       │
                              └──────────────┘

INSERT orderINSERT outbox nằm trong cùng một transaction, hoặc cả hai cùng commit, hoặc cả hai cùng rollback. Không còn kịch bản 3.

Worker publish ra broker có thể fail, nhưng event vẫn nằm yên trong bảng outbox. Worker chỉ cần retry lần sau là xong.

Triển khai với PostgreSQL + FastAPI

Bảng outbox

CREATE TABLE outbox_events (
    id BIGSERIAL PRIMARY KEY,
    aggregate_type VARCHAR(64) NOT NULL,      -- 'order', 'payment', ...
    aggregate_id VARCHAR(64) NOT NULL,        -- order_id
    event_type VARCHAR(128) NOT NULL,         -- 'order.created'
    payload JSONB NOT NULL,
    headers JSONB,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,                 -- NULL = chưa publish
    attempt_count INT NOT NULL DEFAULT 0,
    last_error TEXT
);

-- Index quan trọng: worker query unsent events
CREATE INDEX idx_outbox_unpublished
    ON outbox_events (created_at)
    WHERE published_at IS NULL;

Index partial chỉ gồm row chưa publish — khi bảng đầy hàng triệu event đã publish, query của worker vẫn nhanh.

Service layer — ghi outbox cùng business data

from sqlalchemy.ext.asyncio import AsyncSession

async def create_order(
    session: AsyncSession,
    payload: OrderCreate,
) -> Order:
    order = Order(**payload.model_dump())
    session.add(order)
    await session.flush()  # lấy order.id

    event = OutboxEvent(
        aggregate_type="order",
        aggregate_id=str(order.id),
        event_type="order.created",
        payload={
            "order_id": order.id,
            "buyer_id": order.buyer_id,
            "total": str(order.total),
            "created_at": order.created_at.isoformat(),
        },
    )
    session.add(event)

    await session.commit()  # cả hai cùng commit hoặc cùng rollback
    return order

Không gọi publisher.publish() ở đây. Router cũng không gọi. Đơn giản là ghi vào DB rồi trả về client.

Worker — publish và đánh dấu sent

Worker là một process riêng, chạy loop:

import asyncio
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession

BATCH_SIZE = 100
POLL_INTERVAL = 1.0  # giây

async def publish_outbox_batch(session: AsyncSession, publisher) -> int:
    # SKIP LOCKED cho phép nhiều worker chạy song song không tranh row
    stmt = (
        select(OutboxEvent)
        .where(OutboxEvent.published_at.is_(None))
        .order_by(OutboxEvent.id)
        .limit(BATCH_SIZE)
        .with_for_update(skip_locked=True)
    )
    result = await session.execute(stmt)
    events = result.scalars().all()

    if not events:
        return 0

    for event in events:
        try:
            await publisher.publish(
                topic=event.event_type,
                payload=event.payload,
                headers={
                    "event_id": str(event.id),  # idempotency key cho consumer
                    "aggregate_id": event.aggregate_id,
                },
            )
            event.published_at = func.now()
        except Exception as exc:
            event.attempt_count += 1
            event.last_error = str(exc)[:500]
            # không raise — các event khác trong batch vẫn được xử lý

    await session.commit()
    return len(events)


async def worker_loop(session_factory, publisher):
    while True:
        try:
            async with session_factory() as session:
                processed = await publish_outbox_batch(session, publisher)
            if processed == 0:
                await asyncio.sleep(POLL_INTERVAL)
        except Exception:
            logger.exception("Outbox worker error")
            await asyncio.sleep(POLL_INTERVAL * 5)

Vài điểm đáng chú ý:

  • SKIP LOCKED: nếu bạn chạy nhiều instance worker (để HA), mỗi instance lấy row khác nhau, không block nhau. Đây là tính năng bắt buộc phải có của PostgreSQL trong scenario này.
  • Không raise trong vòng lặp: một event fail không làm sập cả batch. Row đó vẫn published_at = NULL, lần poll sau sẽ retry.
  • event_id trong header: consumer dùng id này làm idempotency key để dedupe.

”At-least-once” — không phải “exactly-once”

Outbox pattern đảm bảo at-least-once delivery. Nghĩa là mỗi event chắc chắn được publish ít nhất một lần, nhưng có thể bị publish nhiều hơn một lần khi:

  • Worker publish xong, chưa kịp UPDATE published_at thì crash
  • Network flap làm broker ack không về tới worker

Lần sau worker chạy lại, nó thấy event chưa sent và publish lại. Consumer nhận cùng một event 2 lần.

Vì vậy consumer phải idempotent. Pattern thường dùng:

async def handle_order_created(event: dict, headers: dict):
    event_id = headers["event_id"]

    # Kiểm tra đã xử lý chưa
    if await processed_events_repo.exists(event_id):
        logger.info("Event already processed, skipping", event_id=event_id)
        return

    async with session.begin():
        # Xử lý business logic
        await do_something(event)
        # Ghi nhận đã xử lý
        await processed_events_repo.create(event_id)

Bảng processed_events đơn giản gồm event_id (PK) + processed_at. Nếu INSERT duplicate → skip.

Dọn dẹp bảng outbox

Bảng outbox sẽ to dần. Có vài cách xử lý:

  1. Xóa sau khi publish: đơn giản nhất, nhưng mất khả năng audit/replay.
  2. Giữ lại 7–30 ngày: cron job xóa row có published_at < NOW() - INTERVAL '7 days'.
  3. Archive sang cold storage: dump ra S3 theo ngày, xóa khỏi bảng.

Mình thường chọn option 2 — 30 ngày đủ để replay khi cần, và bảng vẫn gọn.

DELETE FROM outbox_events
WHERE published_at IS NOT NULL
  AND published_at < NOW() - INTERVAL '30 days';

Khi nào không nên dùng Outbox

  • Latency cực thấp: outbox thêm độ trễ vài trăm ms đến vài giây tùy polling interval. Nếu event cần realtime (ví dụ trading), cân nhắc transactional outbox với CDC (Debezium đọc WAL).
  • Throughput cực cao: ghi mỗi event ra DB có thể là bottleneck. Lúc đó cần shard bảng outbox hoặc dùng Kafka Transactions.
  • Hệ thống đơn giản, chỉ 1 service: nếu không có cross-service event, outbox là overkill.

Với phần lớn B2B, SaaS, e-commerce bình thường (hàng trăm đến vài nghìn event/giây), outbox là lựa chọn mặc định tốt.

Tổng kết

Dual-write giữa DB và message broker là cái bẫy mà hầu hết dev đều bước vào ít nhất một lần. Outbox pattern giải quyết nó bằng một ý tưởng đơn giản: chỉ ghi vào một nơi — database — và để một worker biến state đó thành event.

Checklist triển khai:

  • Bảng outbox_events với index partial trên published_at IS NULL
  • Service layer ghi event vào outbox trong cùng transaction với business data
  • Worker dùng SELECT ... FOR UPDATE SKIP LOCKED để publish
  • Consumer idempotent bằng event_id
  • Cron job cleanup outbox đã publish cũ hơn N ngày
  • Monitor: số event published_at IS NULL lâu hơn X giây → alert

Không có “exactly-once” trong distributed systems, chỉ có “at-least-once + idempotent”. Outbox cho bạn đúng điều đó, với chi phí một bảng DB và một worker nhỏ.

// reactions


cat comments.log


hoatq@dev : ~/blog $