Lambda CDC processing with Tinybird

This guide outlines a practical implementation of CDC processing with Tinybird using a lambda approach. It produces an API that returns the freshest deduplicated view of the data by combining a scheduled batch job with new rows since the last batch.

This is more complex than a simple deduplication query or Materialized View, recommended as an optimization where the dataset or processing SLAs demand it.

Prerequisites

This is a read-through guide, explaining an example, so you don't need an active Workspace to try it out in. Use the concepts and apply them to your own scenario.

To understand the guide, you'll need familiarity with Change Data Capture concepts and Tinybird deduplication strategies.

Data characteristics

This implementation focuses on fast filtering and aggregation of slowly-changing dimensions over a long history with high cardinality.

The test dataset is a Postgres Debezium CDC to Kafka with an event history of tens of millions of updates into ~5M active records, receiving up to 75k new events per hour. Tinybird provides low-latency, high-concurrency responses with real-time data freshness. In this example, CDC Source is configured as partial mode, i.e. only new and changed records are sent as data and deletes as a null. In full CDC you would get the old and new data in each change, which can be very helpful in OLAP processing.

The test dataset exhibits high cardinality over many years, optimized for ElasticSearch with nested JSON arrays. Updates are sparse over data dimensions and time, leading to specific decisions in this implementation. It is also worth noting that the JSON is up to 100kB per document, but for the analysis only a small part is needed.

Any given primary key of the upstream data source can be deleted or the Kafka topic compacted or reloaded, resulting in many ‘null’ records to handle.

Solution features

  • Lambda processing architecture
  • Split data + deletes table processing
  • Null events as deletes by Kafka partition key
  • Batch + speed layer CDC upsert
  • Full data history table
  • Full delete history table
  • Batch table with good sorting keys
  • Latest data as reusable API

Solution technical commentary

This implementation doesn't use AggregatingMergeTree or ReplacingMergeTree due to sorting key limitations. Instead, it uses a MergeTree table with subquery deduplication. The data history and delete tables are split to avoid bloat and null processing, improving performance.

It focuses on the Kafka event timestamp and partition key for deduplication. Various ClickHouse functions are used for JSON extraction, avoiding nulls to speed up processing.

Data pipeline lineage

  1. Raw Kafka Table <raw_kafka_ds>
  2. Initial Materialized Views
    • Data History extraction <mat_data>
    • Delete History extraction <mat_deletes>
  3. Historical Data Sources
    • All insert/update events <data_history_mv>
    • All delete events <deletes_history_mv>
  4. Batching Copy Pipe <copy_batches>
  5. Batches Data Source <batches_ds>
  6. Lambda ‘Upsert’ API <latest_data>
An overview of the Data Flow, with a Kafka Data source, two Materialized Views to keep track of changes and deletes, a Copy Pipe to deduplicate in batches, and a Pipe to combine all Data Sources
Lambda CDC overview

Landing Data Source

Where the CDC events from the Kafka topic are consumed:

raw_kafka_ds.datasource
SCHEMA >
    `__value` String
     --`__topic` LowCardinality(String),
     --`__partition` Int16,
     --`__offset` Int64,
     --`__timestamp` DateTime,
     --`__key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(__timestamp)"
ENGINE_SORTING_KEY "__offset"

KAFKA_CONNECTION_NAME '<cluster_name>'
KAFKA_TOPIC '<topic_name>'
KAFKA_GROUP_ID '<unique_group_id>'
KAFKA_AUTO_OFFSET_RESET 'earliest'
KAFKA_STORE_RAW_VALUE 'True'
KAFKA_STORE_HEADERS 'True'
KAFKA_STORE_BINARY_HEADERS 'True'
KAFKA_TARGET_PARTITIONS 'auto'
KAFKA_KEY_AVRO_DESERIALIZATION ''

Helpful notes and top tips for your own implementation:

  • The other Kafka metadata fields (commented above) like __timestamp etc. are automatically added by Tinybird's Kafka connector.
  • Always increment the KAFKA_GROUP_ID if you reprocess the topic!
  • The __value may be null in the case of a DELETE for many CDC setups, so do not parse the JSON values in the raw table.
  • You can get the operation (INSERT, UPDATE, DELETE...) from the KAFKA_STORE_HEADERS for many CDC sources and read them in the __headers field, though we don’t need it for this implementation as INSERT and UPDATE are equivalent for our purposes, and DELETE is always the null record.
  • The sorting key should definitely be __offset or __partition. CDC Data can often have high density bursts of activity, which results in a lot of changes being written in a short time window. For this reason it is often better to partition and sort raw_kafka_ds data by __partition and /or __offset to avoid the skew of using __timestamp.
  • Remember that you have to pair the __key with the __offset to get a unique pairing, as each partition has its own offsets. This is why __timestamp is a good boundary for multi-partition topics.
  • This implementation does not set a TTL as it only partially processes the __value schema for the given use case. If you want to create other tables out of it you'd need the data source. This also allows reprocessing if you decide you need something else out of the raw JSON.
  • You could optionally run a delete operation on every __offset before the __offset of the first __value that isn’t a null in each Partition, which would effectively truncate the table of all old compactions. This can be done in the CLI with a tb datasource truncate <ds_name> command. Remember to filter by partitions with no active ingest.

Full data history

This section contains all the INSERT and UPDATE operations.

To generate the Materialized View you'd first need a Pipe that will result in the data_history_mv Data Source:

mat_data.pipe
NODE mv
SQL >

   SELECT
      toInt64(__key) as k_key,
      __offset as k_offset,
      __timestamp as k_timestamp,
      __partition as k_partition,
      __value as k_value,
      <use case specific columns>
   FROM raw_kafka_ds
   WHERE __value != 'null'

TYPE materialized
DATASOURCE data_history_mv

Notes:

  • This example treats __key and __timestamp as the primary concern, and then parses out all the various fields the customer wants.
  • You want this table to have the same extracted columns as the batch table, as the Lambda process UNIONs them.
  • Use the various ClickHouse functions for JSON extraction, and avoid Nullable fields as it slows processing and bloats tables.

Here is the Data Source definition of the MV:

data_history_mv.datasource
SCHEMA >

   `k_key` Int64,
   `k_offset` Int64,
   `k_timestamp` DateTime,
   `k_partition` Int16,
   `k_value` String
   <use case specific columns>

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(k_timestamp)"
ENGINE_SORTING_KEY "k_key, k_timestamp"

Helpful notes:

  • __timestamp and __key are critical for lambda processing, so while they are not low cardinality they are the permanent filters in later queries.
  • Because this example mostly cares about quick access to the most recent events, it uses k_timestamp for partitioning.
  • This example keeps the raw JSON of the rest of the record in k_value against some field being wanted for later indexing. You can ignore the column in daily processing, and use it for backfill without reprocessing the entire raw topic again if you need to later. You can obviously partially extract from this for stable column indexing.
  • Additional sorting keys for customer queries are not retained because you need offset and key here, however other approaches could be considered if necessary.
  • All other columns are based on fields extracted from the kafka __value JSON.
  • If the sorting key columns for customer queries were limited to columns that did not change during a CDC update, then a ReplacingMergeTree may work here. However customer updates are often over required columns including date fields making it impractical.

Deletes history

This section contains the DELETE operations.

As above, you need a Pipe to get the deletes and materialize them:

mat_deletes.datasource
NODE mv
SQL >

    SELECT
      toInt64(__key) as k_key,  -- key used as deduplication identity
      __timestamp as k_timestamp -- ts used for deduplication incrementor
    FROM raw_kafka_ds
    WHERE __value = 'null'

TYPE materialized
DATASOURCE deletes_history_mv

Helpful notes:

  • Nothing fancy - just parses out the null records by tracking the __key.
  • Note converting to Int64 instead of String for better performance, as you know the key and offset are auto-incrementing integers from Postgres and Kafka respectively. This may not always be true for other CDC sources.

It results into a table with all the deletes:

deletes_history_mv.datasource
SCHEMA >
    `k_key` Int64,
    `k_timestamp` DateTime

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(k_timestamp)"
ENGINE_SORTING_KEY "k_timestamp, k_key"

Notes:

  • Sorting key order is deliberate. You want to be able to fetch only the __timestamps since the last batch was run for lambda processing, and then you want the __keys that are to be deleted from the dataset as a list for delete processing.
  • You do not need to have a separate deletes table for an efficient implementation, however if you have a lot of deletes (such as in compacted and reloaded Kafka Topics for CDC) then you may find this more efficient at the cost of a little more implementation complexity. A recommended approach is to start with a single history table including both data and deletes and then optimize later if necessary.

Consolidation batches

Using Copy Pipes, you can generate snapshots of the state of the original table at several points in time. This will help you speed up compute later, only having to account for changes that arrived since last snapshot/batch happened.

copy_batches.pipe
NODE get_ts_boundary
SQL >

    WITH (
      (SELECT max(k_timestamp) FROM data_history_mv) AS max_history,
      (SELECT max(k_timestamp) FROM delete_history_mv) AS max_deletes
    ) 
    SELECT least(max_history, max_deletes, now()) as batch_ts_

NODE dedupe_and_delete_history
SQL >

   WITH 
   latest_rows AS (
      SELECT k_key, max(k_timestamp) AS latest_ts
      FROM <history_table>
      GROUP BY k_key
   ),
   ts_boundary AS (
      SELECT batch_ts_ AS batch_ts
      FROM get_ts_boundary
   )
   SELECT f.*, assumeNotNull((SELECT batch_ts FROM ts_boundary)) AS batch_ts
   FROM data_history_mv f
   INNER JOIN latest_rows lo
   ON f.k_key = lo.k_key AND f.k_timestamp = lo.latest_ts
   WHERE f.k_key NOT IN (
      SELECT k_key 
      FROM deletes_history_mv 
      WHERE k_timestamp <= (SELECT batch_ts FROM ts_boundary)
   )
   AND f.k_timestamp <= (SELECT batch_ts FROM ts_boundary)
   <other customer filter rules>

TYPE copy
TARGET_DATASOURCE batches_ds
COPY_SCHEDULE 0,30 * * * *

Notes:

  • You use the slowest data processing stream as the batch timestamp boundary, in case one of the Kafka partitions is lagging and other typical stream processing challenges.
  • This example uses a subquery and self-join to deduplicate because testing showed it as performing the best over the dataset used. Each dataset will have unique characteristics that may drive a different approach such as LIMIT 1 BY etc.
  • Note the deduplication method works fine in batch, but is the same as the lambda as well.
  • The schedule should be adjusted to match the customer cadence requirements.
  • Note that this example uses <= in the row selection here, and > in the selection later to ensure it doesn't duplicate the boundary row.
batches_ds.datasource
SCHEMA >
   `k_key` Int64,
   `k_offset` Int64,
   `k_timestamp` DateTime,
   `k_partition` Int16,
   `k_value` String,
   `batch_ts` DateTime
   <customer columns>

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toDate(batch_ts)"
ENGINE_SORTING_KEY "batch_ids, <customer sorting keys>"
ENGINE_TTL "batch_ts + toIntervalDay(1)"

Notes:

  • You cannot use a TTL to simply keep the last 3 versions, so you must pick a date and monitor that batches are running as expected (ClickHouse won’t consider the whole table in a TTL query, just that row).
  • This Table forms the bulk of the rows used for the actual query process, so it’s important that the sorting keys are optimized for data results.
  • The batch_id remains at the head of the sorting key so you can quickly select the latest batch for use.
  • The batch_id is a simple Timestamp boundary of all rows across all Partitions included in the batch, including all Deletes already applied to the batch. This is important when understanding the logic of the Lambda processing later.
  • Partition key is by day on the batch_ts so you can read as few rows as possible, but all sequentially.
  • Analysis of the customer sorting keys may yield good optimization information, such as a need for controlling index granularity if they handle a lot of multi-tenant data, for example.

Lambda Pipe

Lastly, you get the latest snapshot plus all the changes since then, and consolidate. This API Endpoint can also be used as a "view" so that other Pipes query it.

latest_values.pipe
NODE get_batch_info
SQL >
    SELECT max(batch_ts) AS batch_ts 
    FROM batches_ds

NODE new_deletes
SQL >

    select k_key 
    FROM deletes_history_mv
    WHERE k_timestamp > (SELECT batch_ts from get_batch_info)  -- only rows since last batch

NODE filter_new_rows
SQL >

    %
    SELECT {{columns(cols, 'k_key, k_offset, k_timestamp, <customer_cols>')}}
    FROM data_history_mv
    WHERE 1 
    AND k_timestamp > (SELECT batch_ts from get_batch_info)  -- only rows since last batch
    AND k_key not in (select k_key from new_deletes)  -- remove newly deleted rows from new rows
    AND <customer filters>

NODE dedup_new_rows_by_subquery
SQL >

    WITH latest_rows AS (
        SELECT k_key, max(k_timestamp) AS latest_ts
        FROM filter_new_rows
        GROUP BY k_key
      )
    SELECT f.*
    FROM filter_new_rows f
    INNER JOIN latest_rows lo
    ON f.k_key = lo.k_key AND f.k_timestamp = lo.latest_ts

NODE get_and_filter_batch
SQL >

    %
    SELECT {{columns(cols, 'k_key, k_offset, k_timestamp, <customer_cols>')}}
    FROM batches_ds
    PREWHERE batch_ts = (SELECT batch_ts from get_batch_info)  -- get latest batch
    WHERE 1
    AND k_key not in (select k_key from new_deletes)  --  filter by new deletes since last batch
    AND k_key not in (select k_key from dedup_new_rows_by_subquery)  -- omit rows already updated since batch


NODE batch_and_latest
SQL >

    SELECT * FROM get_and_filter_batch
    UNION ALL
    SELECT * FROM dedup_new_rows_by_subquery

Notes:

  • This is a longer Pipe which is published as an API.
  • It starts by determining which batch to use, which also gives you the boundary timestamp. It then fetches and processes all new rows since the selected batch, including deletes processing and deduplication. It then backfills this with all other rows from the batch, and UNIONs the results.
  • It uses the same deduplication strategy as the batch processing Pipe for consistency of results.
  • Note the use of the columns Parameter. This then defaults to returning all columns, but a user can specify a subset to reduce the data fetched and processed.
  • This API can then be called as a Data Source by other Pipes, which can also use the same parameter names to pass through filters like columns or other customer filters that may be required.

Conclusion

This image explains, in detail, the full overview of this approach: A Kafka Data Source, two Materialized Views to keep track of changes and deletes, a Copy Pipe to deduplicate in batches, and a Pipe to combine all Data Sources.

An overview of the data flow, with a Kafka Data Source, two Materialized Views to keep track of changes and deletes, a Copy Pipe to deduplicate in batches, and a Pipe to combine all Data Sources
Lambda CDC overview

Possible improvements

This example deliberately kept the full history, but you could speed up and store less data if the history MVs are ReplacingMergeTree, or if you add a TTL long enough to be sure the changes have been incorporated to the batch Data Source.

Alternatives

A simpler way of achieving the latest view

This example is solving for many peculiarities of the test dataset, like not having a simple key for deduplication, and a large number of delete operations bloating the resulting tables. As a comparison, here's a solution you can use when your CDC case is very simple.

It's possibly less formant (since you need to extract fields at query time, filter deletes over the whole data source...) but easier and perfectly functional if volumes are not too big. Just define the Kafka Data Source as a ReplacingMergeTree (or create a MV from raw), query with FINAL, and exclude deletes:

raw_kafka_ds_rmt.datasource
SCHEMA >
    `__value` String
     --`__topic` LowCardinality(String),
     --`__partition` Int16,
     --`__offset` Int64,
     --`__timestamp` DateTime,
     --`__key` String

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "__key"
ENGINE_VER "__offset" --or "__timestamp"

An example query to consolidate latest updates and exclude duplicates:

raw_kafka_ds_rmt.datasource
SELECT
   *
   , JSONExtract(__value, 'field', type) as field --this for every field
FROM raw_kafka_ds_rmt FINAL
WHERE __value!='null'

Next steps