Deduplicate data in your Data Source

Sometimes you might need to deduplicate data, for example to receive updates or data from a transactional database through CDC. You might want to retrieve only the latest data point, or keep a historic record of the evolution of the attributes of an object over time.

Because Tinybird doesn't enforce uniqueness for primary keys when inserting rows, you need to follow different strategies to deduplicate data with minimal side effects.

Deduplication strategies

You can use one of the following strategies to deduplicate your data.

MethodWhen to use
Deduplicate at query timeDeduplicate data at query time if you are still prototyping or the Data Source is small.
Use ReplacingMergeTreeUse ReplacingMergeTree or AggregatingMergeTree for greater performance.
Snapshot based deduplicationIf data freshness isn't required, generate periodic snapshots of the data and take advantage of subsequent Materialized Views for rollups.
Hybrid approach using Lambda architectureWhen you need to overcome engine approach limitations while preserving freshness, combine approaches in a Lambda architecture.

For dimensional and small tables, a periodical full replace is usually the best option.

Example case

Consider a dataset from a social media analytics company that wants to track some data content over time. You receive an event with the latest info for each post, identified by post_id. The three fields, views, likes, tags, vary from event to event. For example:

post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }

Deduplicate at query time

Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id and get the latest value with these strategies:

  • Get the max date for each post in a subquery and then filter by its results.
  • Group data by post_id and use the argMax function.
  • Use the LIMIT BY clause.

Select Subquery, argMax, or LIMIT BY to see the example queries for each.

Deduplicating data on post_id using Subquery
SELECT *
FROM posts_info 
WHERE (post_id, timestamp) IN 
(
    SELECT 
        post_id, 
        max(timestamp)
    FROM posts_info 
    GROUP BY post_id
)

Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach is faster than the others.

In general, deduplicating at query time is fine if the size of your data is small. If you have lots of data, use a specific Engine that takes care of deduplication for you.

Use the ReplacingMergeTree engine

If you've lots of data and you're interested in the latest insertion for each unique key, use the ReplacingMergeTree engine with the following options: ENGINE_SORTING_KEY, ENGINE_VER, and ENGINE_IS_DELETED.

  • Rows with the same ENGINE_SORTING_KEY are deduplicated. You can select one or more columns.
  • If you specify a type for ENGINE_VER, the row with the highest ENGINE_VER for each unique ENGINE_SORTING_KEY is kept, for example a timestamp.
  • ENGINE_IS_DELETED is only active if you use ENGINE_VER. This column determines whether the row represents the state or is to be deleted; 1 is a deleted row, 0 is a state row. The type must be UInt8.
  • You can omit ENGINE_VER, so that the last inserted row for each unique ENGINE_SORTING_KEY is kept.

Aggregation or rollups in Materialized Views built on top of ReplacingMergeTree queries always contain duplicated data.

Define a Data Source

Define a Data Source like the following:

post_views_rmt.datasource
DESCRIPTION >
    Data Source to save post info. ReplacingMergeTree Engine.

SCHEMA >
    `post_id` Int32 `json:$.post_id`,
    `views` Int32 `json:$.views`,
    `likes` Int32 `json:$.likes`,
    `tag` String `json:$.tag`,
    `timestamp` DateTime `json:$.timestamp`,
    `_is_deleted` UInt8 `json:$._is_deleted`

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "post_id"
ENGINE_VER "timestamp"
ENGINE_IS_DELETED "_is_deleted"

ReplacingMergeTree deduplicates during a merge, and merges can't be controlled. Consider adding the FINAL clause, or an alternative deduplication method, to apply the merge at query time. Note also that rows are masked, not removed, when using FINAL.

Deduplicating data on post_id using FINAL
SELECT *
FROM posts_info_rmt FINAL

You can define the posts_info_rmt as the landing Data Source, the one you send events to, or as a Materialized View from posts_info. You can also create a Data Source with an AggregatingMergeTree Engine using maxState(ts) and argMaxState(field,ts).

Snapshot based deduplication

Use Copy Pipes to take a query result and write it to a new Data Source in the following situations:

  • You need other Sorting Keys that might change with updates.
  • You need to do rollups and want to use Materialized Views.
  • Response times are too long with a ReplacingMergeTree.

The following is an example snapshot:

post_generate_snapshot.pipe
NODE gen_snapshot
SQL >

    SELECT
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag,
        max(timestamp) as ts,
        toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts
    FROM posts_info
    WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE
    GROUP BY post_id

TYPE copy
TARGET_DATASOURCE post_snapshot
COPY_MODE replace
COPY_SCHEDULE 0 * * * *

Because the TARGET_DATASOURCE engine is a MergeTree, you can use fields that you expect to be updated as sorting keys in the ReplacingMergeTree.

post_snapshot.datasource
SCHEMA >
    `post_id` Int32,
    `views` Int32,
    `likes` Int32,
    `tag` String,
    `ts` DateTime,
    `snapshot_ts` DateTime

ENGINE "MergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "tag, post_id"

Hybrid approach using Lambda architecture

Snapshots might decrease data freshness, and running Copy Pipes too frequently might be more expensive than Materialized Views. A way to mitigate these issues is to combine batch and real-time processing, reading the latest snapshot and incorporating the changes that happened since then.

This pattern is described in the Lambda architecture guide. See a practical example in the CDC using Lambda guide.

Using the post_snapshot Data Source created before, the real-time Pipe would be like the following:

latest_values.pipe
NODE get_latest_changes
SQL >

    SELECT 
        max(timestamp) last_ts,
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag
    FROM posts_info_rmt
    WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot)
    GROUP BY post_id

NODE get_snapshot
SQL >

    SELECT 
        last_ts,
        post_id, 
        views,
        likes,
        tag
    FROM posts_info_rmt
    WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot)
    AND post_id NOT IN (SELECT post_id FROM get_latest_changes)


NODE combine_both
SQL >

    SELECT * FROM get_snapshot
    UNION ALL
    SELECT * FROM get_latest_changes

Next steps

Updated