NHT

Phân Tích & Tính Phí API Realtime Với ClickHouse Và Redis Streams

Cách GoGoDuk đo lường lượng dùng API ở quy mô lớn bằng ClickHouse — ingestion qua Redis Streams, schema ReplacingMergeTree và materialized view AggregatingMergeTree cho dashboard theo từng khách hàng và tính phí theo mức dùng, realtime.

Nguyen Hoang TuanNguyen Hoang Tuan19 thg 6, 202611 phút đọc

Mỗi request tới GoGoDuk map API — một lần autocomplete địa chỉ, một lần reverse geocode, một lần gọi directions — đều phải được đo lường. Chúng tôi cần dashboard usage theo từng khách hàng, hạch toán rate limit, và tính phí theo mức dùng chính xác. Cách ngây thơ là ghi một dòng vào Postgres cho mỗi request rồi GROUP BY sau. Ở bất kỳ mức lưu lượng nghiêm túc nào, cách đó lặng lẽ phá hỏng database chính của bạn.

Bài này là lớp analytics nằm sau rate limiter. Nếu rate limiting nói về thực thi — quyết định một request có được phép ngay lúc này hay không — thì analytics nói về đo lường: đếm, tổng hợp và tính phí sau đó. Phần thực thi đã được nói tới trong Xây dựng Rate Limiter với Redis Lua Script. Ở đây chúng ta dựng phần đo lường bằng ClickHouseRedis Streams.


Vì sao Postgres và Redis không đủ cho usage analytics

Hai store chúng tôi đang chạy đều rất giỏi việc của chúng, và đều sai cho việc này.

Redis giữ counter realtime. Nó trả lời "API key này có đang vượt giới hạn ngay bây giờ không?" trong vài microsecond. Nhưng nó không lưu lịch sử — bạn không thể hỏi "khách hàng này đã gọi bao nhiêu request theo từng endpoint trong 30 ngày qua?"

Postgres là transactional (OLTP). Nó tuyệt vời cho đọc/ghi trên các dòng có index, đó chính là lý do chúng tôi dựa vào nó cho hot path geocoding — xem Tối ưu hiệu năng PostGIS. Nhưng các truy vấn phân tích — GROUP BY organization, day trên hàng trăm triệu dòng — là một dạng bài toán khác. Chúng làm phình DB chính, tranh buffer cache với traffic production, và mỗi tuần lại chậm hơn.

Với chúng tôi, "giọt nước tràn ly" rất cụ thể: một truy vấn dashboard "usage 30 ngày qua, tách theo endpoint" cho từng khách hàng bắt đầu timeout. Đó là lúc bạn ngừng chắp vá analytics lên database OLTP và chuyển nó sang một store cột (OLAP). ClickHouse sinh ra đúng cho việc này: lưu trữ dạng cột, nén mạnh, và truy vấn tổng hợp quét hàng tỉ dòng trong chưa tới một giây.


Đừng ghi analytics trên hot path

Phản xạ đầu tiên — INSERT INTO usage_events đồng bộ ngay trong request handler — là một cái bẫy, vì hai lý do.

Thứ nhất, nó thêm một round-trip mạng và một lần ghi vào độ trễ của mọi lời gọi API. Analytics không bao giờ được làm sản phẩm chậm đi.

Thứ hai, ClickHouse ghét nhiều insert nhỏ. Mỗi INSERT tạo ra một "part" dữ liệu mới trên đĩa mà engine phải merge nền sau đó. Hàng nghìn insert một dòng mỗi giây sinh ra một cơn bão part nhỏ và áp lực merge khủng khiếp. ClickHouse ưu tiên mạnh các insert theo lô, ít lần hơn nhưng lớn hơn.

Cả hai vấn đề có chung một lời giải: đẩy event ra khỏi request path ngay lập tức, và gom ghi theo lô ở nơi khác. Request handler phát một usage event vào một Redis Stream (usage_logs_stream) rồi trả về. Một worker nền riêng biệt rút stream đó và ghi vào ClickHouse theo lô.

API request  ──phát──▶  Redis Stream (usage_logs_stream)
                              │
                              ▼
                  Go consumer-group worker
                      (gom tối đa 1000)
                              │
                              ▼
                   ClickHouse  usage_events

Redis Streams là lựa chọn có chủ đích thay vì list thường hay pub/sub: nó là một log append-only bền vững với consumer group, cho ta at-least-once delivery, acknowledgement, và khả năng phục hồi các message mà một worker bị crash chưa xử lý xong.


Worker ingestion: Redis Streams consumer group

Worker là một service Go nhỏ. Nó đọc message mới bằng XReadGroup, xử lý theo lô, rồi ack. Chi tiết quan trọng là nó cũng đòi lại các message đã được giao cho một consumer chết trước khi ack, bằng XAutoClaim — nên một cú crash giữa chừng lô không bao giờ làm mất event một cách âm thầm.

const (
    StreamName   = "usage_logs_stream"
    GroupName    = "clickhouse_consumer"
    BatchSize    = 1000
    IdleTimeout  = 2 * time.Second
)

func (w *ClickhouseWorker) consume(repo *clickhouse.Repository) error {
    for {
        // 1) Đòi lại message mà consumer chết chưa ACK.
        pending, _, err := w.rdb.XAutoClaim(w.ctx, &redis.XAutoClaimArgs{
            Stream: StreamName, Group: GroupName, Consumer: "worker-1",
            MinIdle: 5 * time.Second, Start: "0-0", Count: BatchSize,
        }).Result()
        if err != nil && !errors.Is(err, redis.Nil) {
            return err
        }
        if len(pending) > 0 {
            if err := w.processMessages(repo, pending); err != nil {
                return err
            }
            continue
        }

        // 2) Đọc một lô mới, block tối đa IdleTimeout.
        streams, err := w.rdb.XReadGroup(w.ctx, &redis.XReadGroupArgs{
            Group: GroupName, Consumer: "worker-1",
            Streams: []string{StreamName, ">"},
            Count:   BatchSize, Block: IdleTimeout,
        }).Result()
        if errors.Is(err, redis.Nil) {
            continue
        }
        if err != nil {
            return err
        }
        for _, s := range streams {
            if err := w.processMessages(repo, s.Messages); err != nil {
                return err
            }
        }
    }
}

processMessages parse mỗi entry thành một UsageEvent có kiểu, đẩy bất kỳ entry hỏng nào vào một dead-letter stream (usage_logs_dead_letter) thay vì làm sập pipeline, batch-insert các dòng hợp lệ, và chỉ sau đó mới XAck cả lô. Lần ghi ClickHouse dùng prepared batch — một INSERT cho tối đa một nghìn dòng:

batch, _ := conn.PrepareBatch(ctx, `INSERT INTO usage_events (...)`)
for _, e := range events {
    batch.Append(e.RequestID, e.OrganizationID, e.Endpoint, /* ... */)
}
if err := batch.Send(); err != nil {
    return err
}

Một đặc tính đáng nhấn mạnh: nếu ClickHouse không khả dụng, worker log lại và vẫn giữ Postgres metering hoạt động trong khi event tích lũy an toàn trong Redis Stream. Worker reconnect theo một khoảng cố định rồi rút phần tồn. ClickHouse chết chỉ làm analytics kém tươi mới — không bao giờ làm mất event và không bao giờ động vào request path.

At-least-once delivery, tính phí effectively-once

Consumer group đảm bảo giao at-least-once: sau một lần crash-rồi-đòi-lại, cùng một event có thể bị xử lý hai lần. Với tính phí, đếm trùng là không chấp nhận được. Chúng tôi xử lý ở hai lớp:

  1. Worker khử trùng trong một lô theo requestId trước khi insert.
  2. Bảng là ReplacingMergeTree(ingestedAt) được khóa sao cho các requestId trùng gộp về một dòng trong lúc merge nền.

Hai lớp này biến giao at-least-once thành hạch toán effectively-once — nền tảng cho một hệ thống tính phí đáng tin.


Thiết kế schema usage_events

Đây là bảng event thô, rút gọn còn các cột cốt lõi:

CREATE TABLE usage_events
(
    requestId      String,
    organizationId String,
    apiKeyId       Nullable(String),
    operation      LowCardinality(String),
    endpoint       LowCardinality(String),
    statusCode     UInt16,
    statusClass    UInt8,
    elapsedMs      UInt32,
    providerUsed   LowCardinality(String) DEFAULT '',
    country        LowCardinality(String) DEFAULT '',
    createdAt      DateTime64(3),
    ingestedAt     DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplacingMergeTree(ingestedAt)
PARTITION BY toYYYYMM(createdAt)
ORDER BY (organizationId, toDate(createdAt), endpoint, createdAt, requestId)
TTL createdAt + INTERVAL 90 DAY DELETE;

Vài quyết định quan trọng:

  • LowCardinality(String) cho endpoint, operation, providerUsed, country, và các cột tương tự. Chúng chỉ có một nhúm giá trị phân biệt trên toàn bảng; LowCardinality dictionary-encode chúng, nén tốt và nhanh hơn nhiều so với String thường.
  • ORDER BY (organizationId, toDate(createdAt), endpoint, ...) — sort key dẫn đầu bằng organizationId vì access pattern chủ đạo là theo từng khách hàng: mọi dashboard và mọi hóa đơn đều lọc theo organization trước. Đặt cột bạn lọc lên đầu primary key giúp ClickHouse bỏ qua gần hết dữ liệu.
  • PARTITION BY toYYYYMM(createdAt) cộng TTL ... 90 DAY DELETE khiến việc giữ dữ liệu gần như miễn phí: tháng cũ là cả partition bị drop, không phải xóa từng dòng.
  • ReplacingMergeTree(ingestedAt) cho ta tính idempotency nói trên, dùng ingestedAt mới nhất làm version.

Pre-aggregate bằng materialized view

Đọc event thô cho mỗi lần tải dashboard là lãng phí: dashboard muốn "số request theo endpoint theo giờ cho khách này", không phải một tỉ dòng riêng lẻ. Vậy nên chúng tôi roll-up stream thô ngay khi nó tới, bằng materialized view ghi vào bảng AggregatingMergeTree.

CREATE TABLE usage_hourly_identity
(
    hour           DateTime,
    organizationId String,
    endpoint       LowCardinality(String),
    requests   AggregateFunction(uniqCombined64, String),
    errors     AggregateFunction(uniqCombined64, String),
    latencyMs  AggregateFunction(quantilesTiming(0.5, 0.95, 0.99), UInt32)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (organizationId, hour, endpoint);

CREATE MATERIALIZED VIEW usage_hourly_identity_mv TO usage_hourly_identity AS
SELECT
    toStartOfHour(createdAt) AS hour, organizationId, endpoint,
    uniqCombined64State(requestId)                         AS requests,
    uniqCombined64IfState(requestId, statusCode >= 400)    AS errors,
    quantilesTimingState(0.5, 0.95, 0.99)(elapsedMs)       AS latencyMs
FROM usage_events
GROUP BY hour, organizationId, endpoint;

Các combinator *State (uniqCombined64State, quantilesTimingState) lưu trạng thái tổng hợp một phần, không phải giá trị cuối — nên percentile và đếm distinct vẫn đúng khi ClickHouse merge các bucket theo giờ theo thời gian. Dashboard đọc từ rollup gọn này và phản hồi nhanh, bất kể bảng thô lớn tới đâu. Chính rollup này cấp dữ liệu cho ngưỡng quota 80% và 100% cùng tổng hóa đơn hàng tháng mà khách thấy trong dashboard GoGoDuk.


Truy vấn để có insight realtime

Để đọc một rollup AggregatingMergeTree, bạn "chốt" các trạng thái đã lưu bằng combinator *Merge tương ứng. Ví dụ, tóm tắt usage theo từng khách hàng trong một khoảng:

SELECT
    uniqCombined64Merge(requests) AS requests,
    uniqCombined64Merge(errors)   AS errors,
    quantilesTimingMerge(0.5, 0.95, 0.99)(latencyMs) AS latency
FROM usage_hourly_identity
WHERE organizationId = {org:String}
  AND hour >= {from:DateTime} AND hour < {to:DateTime};

Top endpoint theo p95 latency trong 7 ngày qua:

SELECT
    endpoint,
    quantilesTimingMerge(0.95)(latencyMs)[1] AS p95_ms,
    uniqCombined64Merge(requests)            AS requests
FROM usage_hourly_identity
WHERE hour >= now() - INTERVAL 7 DAY
GROUP BY endpoint
ORDER BY p95_ms DESC;

Vì ta đọc các bucket theo giờ đã tổng hợp sẵn thay vì event thô, các truy vấn này vẫn nhanh khi lịch sử lớn dần — chi phí tỉ lệ với số giờ, không phải số request.


Bài học vận hành và những cái bẫy

  • Kích thước lô là một đánh đổi. Quá nhỏ thì tái lập vấn đề insert nhỏ. Quá lớn thì một cú crash mất một cửa sổ in-flight lớn hơn (và tốn RAM hơn). Một lô khoảng một nghìn dòng kèm flush theo idle ngắn là điểm cân bằng tốt với chúng tôi.
  • Async nghĩa là eventual. Dashboard trễ so với thực tế đúng bằng khoảng flush. Điều đó hoàn toàn ổn cho analytics và tính phí, nhưng đó chính là lý do Redis — không phải ClickHouse — vẫn nắm việc thực thi rate limit realtime. Việc tách hai hệ là có chủ đích: Redis cho quyết định ngay lúc này, ClickHouse cho sự thật theo thời gian.
  • Theo dõi dead-letter stream. Một đợt tăng vọt usage_logs_dead_letter nghĩa là một producer đã đổi shape của event. Alert nó; đừng để event hỏng biến mất âm thầm.
  • Để partition và TTL lo việc giữ dữ liệu. Drop một partition cũ một tháng gần như tức thì; xóa từng dòng thì không.

Kết luận

Kiến trúc rút lại thành một sự phân vai gọn gàng: Redis thực thi giới hạn realtime, ClickHouse đo lường usage ở quy mô lớn, và materialized view phục vụ dashboard cùng tính phí tức thì. Ba điều đáng mang theo cho bất kỳ API lưu lượng lớn nào:

  1. Đừng bao giờ đo lường đồng bộ trên hot path — phát vào một log bền vững và gom ghi theo lô.
  2. Khớp storage engine với access pattern: OLTP (Postgres) cho transaction, OLAP (ClickHouse) cho tổng hợp.
  3. Pre-aggregate bằng materialized view để chi phí dashboard tỉ lệ với thời gian, không phải traffic.

Về phần thực thi của cùng hệ thống này, xem Xây dựng Rate Limiter với Redis Lua Script.

Hệ thống của bạn đang gặp vấn đề hiệu năng hay mở rộng tải?

Tôi chuyên xây dựng hạ tầng bản đồ độ trễ thấp, streaming pipeline thời gian thực (Kafka, ClickHouse) và các hệ thống backend tối ưu. Hãy cùng hợp tác để nâng cấp sản phẩm của bạn.

Hợp tác ngay

Tác giả

Nguyen Hoang Tuan

Nguyen Hoang Tuan

Full-stack developer focused on practical backend architecture, web performance, and production delivery.

Bài viết liên quan