Tinybird Forward is live! See the Tinybird Forward documentation to learn more.

Deduplicate data in your data source

Sometimes you might need to deduplicate data, for example to receive updates or data from a transactional database through CDC. You might want to retrieve only the latest data point, or keep a historic record of the evolution of the attributes of an object over time.

Because Tinybird doesn't enforce uniqueness for primary keys when inserting rows, you need to follow different strategies to deduplicate data with minimal side effects.

Deduplication strategies

You can use one of the following strategies to deduplicate your data.

MethodWhen to use
Deduplicate at query timeDeduplicate data at query time if you are still prototyping or the data source is small.
Use ReplacingMergeTreeUse ReplacingMergeTree or AggregatingMergeTree for greater performance.
Snapshot based deduplicationIf data freshness isn't required, generate periodic snapshots of the data and take advantage of subsequent materialized views for rollups.
Hybrid approach using Lambda architectureWhen you need to overcome engine approach limitations while preserving freshness, combine approaches in a Lambda architecture.

For dimensional and small tables, a periodical full replace is usually the best option.

Example case

Consider a dataset from a social media analytics company that wants to track some data content over time. You receive an event with the latest info for each post, identified by post_id. The three fields, views, likes, tags, vary from event to event. For example:

post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }

Deduplicate at query time

Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id and get the latest value with these strategies:

  • Get the max date for each post in a subquery and then filter by its results.
  • Group data by post_id and use the argMax function.
  • Use the LIMIT BY clause.

Select Subquery, argMax, or LIMIT BY to see the example queries for each.

Deduplicating data on post_id using Subquery
SELECT *
FROM posts_info 
WHERE (post_id, timestamp) IN 
(
    SELECT 
        post_id, 
        max(timestamp)
    FROM posts_info 
    GROUP BY post_id
)

Depending on your data and how you define the sorting keys in your data sources to store it on disk, one approach is faster than the others.

In general, deduplicating at query time is fine if the size of your data is small. If you have lots of data, use a specific Engine that takes care of deduplication for you.

Use the ReplacingMergeTree engine

If you've lots of data and you're interested in the latest insertion for each unique key, use the ReplacingMergeTree engine with the following options: ENGINE_SORTING_KEY, ENGINE_VER, and ENGINE_IS_DELETED.

  • Rows with the same ENGINE_SORTING_KEY are deduplicated. You can select one or more columns.
  • If you specify a type for ENGINE_VER, the row with the highest ENGINE_VER for each unique ENGINE_SORTING_KEY is kept, for example a timestamp.
  • ENGINE_IS_DELETED is only active if you use ENGINE_VER. This column determines whether the row represents the state or is to be deleted; 1 is a deleted row, 0 is a state row. The type must be UInt8.
  • You can omit ENGINE_VER, so that the last inserted row for each unique ENGINE_SORTING_KEY is kept.

Aggregation or rollups in materialized views built on top of ReplacingMergeTree queries always contain duplicated data.

Define a data source

Define a data source like the following:

post_views_rmt.datasource
DESCRIPTION >
    data source to save post info. ReplacingMergeTree Engine.

SCHEMA >
    `post_id` Int32 `json:$.post_id`,
    `views` Int32 `json:$.views`,
    `likes` Int32 `json:$.likes`,
    `tag` String `json:$.tag`,
    `timestamp` DateTime `json:$.timestamp`,
    `_is_deleted` UInt8 `json:$._is_deleted`

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "post_id"
ENGINE_VER "timestamp"
ENGINE_IS_DELETED "_is_deleted"

ReplacingMergeTree deduplicates during a merge, and merges can't be controlled. Consider adding the FINAL clause, or an alternative deduplication method, to apply the merge at query time. Note also that rows are masked, not removed, when using FINAL.

Deduplicating data on post_id using FINAL
SELECT *
FROM posts_info_rmt FINAL

You can define the posts_info_rmt as the landing data source, the one you send events to, or as a materialized view from posts_info. You can also create a data source with an AggregatingMergeTree Engine using maxState(ts) and argMaxState(field,ts).

Snapshot based deduplication

Use Copy Pipes to take a query result and write it to a new data source in the following situations:

  • You need other Sorting Keys that might change with updates.
  • You need to do rollups and want to use materialized views.
  • Response times are too long with a ReplacingMergeTree.

The following is an example snapshot:

post_generate_snapshot.pipe
NODE gen_snapshot
SQL >

    SELECT
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag,
        max(timestamp) as ts,
        toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts
    FROM posts_info
    WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE
    GROUP BY post_id

TYPE copy
TARGET_DATASOURCE post_snapshot
COPY_MODE replace
COPY_SCHEDULE 0 * * * *

Because the TARGET_DATASOURCE engine is a MergeTree, you can use fields that you expect to be updated as sorting keys in the ReplacingMergeTree.

post_snapshot.datasource
SCHEMA >
    `post_id` Int32,
    `views` Int32,
    `likes` Int32,
    `tag` String,
    `ts` DateTime,
    `snapshot_ts` DateTime

ENGINE "MergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "tag, post_id"

Hybrid approach using Lambda architecture

Snapshots might decrease data freshness, and running Copy Pipes too frequently might be more expensive than materialized views. A way to mitigate these issues is to combine batch and real-time processing, reading the latest snapshot and incorporating the changes that happened since then.

This pattern is described in the Lambda architecture guide. See a practical example in the CDC using Lambda guide.

Using the post_snapshot data source created before, the real-time Pipe would be like the following:

latest_values.pipe
NODE get_latest_changes
SQL >

    SELECT 
        max(timestamp) last_ts,
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag
    FROM posts_info_rmt
    WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot)
    GROUP BY post_id

NODE get_snapshot
SQL >

    SELECT 
        last_ts,
        post_id, 
        views,
        likes,
        tag
    FROM posts_info_rmt
    WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot)
    AND post_id NOT IN (SELECT post_id FROM get_latest_changes)


NODE combine_both
SQL >

    SELECT * FROM get_snapshot
    UNION ALL
    SELECT * FROM get_latest_changes

Using argMax with null values

Here are the definitions for argMax functions:

  • argMaxState: used to pipe a constantly updating max state into a materialized view.
  • argMaxMerge: used to query a max state value out of a materialized view.

When dealing with null values, argMax functions might not behave as you expect. In raw queries, you might see a null value for the most recent record. However, when data is piped into an AggregatingMergeTree materialized view using argMaxState and later queried using argMaxMerge, the result can be different.

Returning to the social media analytics example, imagine you want to track most recent time when a post was flagged.

The raw query would be as follows:

get_latest_flagged_at_raw.pipe
#
# This returns `null` if the most recent record's `flaggedAt` value is null.
#

NODE get_latest_flagged_at_raw
SQL >

    SELECT flaggedAt
    FROM posts
    WHERE post_id = 'abc123'
    ORDER BY timestamp DESC
    LIMIT 1

First, data is aggregated into a materialized view with a pipe that uses argMaxState:

get_latest_flagged_at_mat.pipe
NODE get_latest_flagged_at
SQL >

    SELECT argMaxState(flaggedAt, timestamp)
    FROM posts

Later, the aggregated state is merged with argMaxMerge when the materialized view is queried:

get_latest_flagged_at_mat.pipe
NODE get_absolute_latest_flagged_at
SQL >

    SELECT argMaxMerge(flaggedAt)
    FROM post_analytics_mv
    WHERE post_id = 'abc123'    

Although you might expect a null result because the raw query returned null, the merging process prefers any non‑null value over a null value—even if its associated timestamp is lower. The result is the most recent non‑null flaggedAt value.

Why this happens and workaround

During the merge process, the materialized view combines key‑value candidate states. A key-value candidate in this example would be timestamp and flaggedAt. If one candidate has a non‑null flaggedAt and another has a null value for flaggedAt, the non‑null value “wins” regardless of its timestamp. This behavior is inherent to the merging logic in argMaxMerge . It explains why in an argMaxMerge query, you might not get the absolute max.

To prevent the merging behavior from overriding a null with a non‑null candidate, you need to handle null values explicitly before they enter the materialized view. One common approach is to transform null values to a known default—often the Unix epoch 1970-01-01 00:00:00. However, be aware that such conversions produce “fake” values which must be recognized in subsequent processing.

Assume your raw data includes a flaggedAt column, which may contain nulls. You can pre-process the data during aggregation as follows:

get_latest_flagged_at_no_nulls.pipe
NODE get_latest_flagged_at_no_nulls
SQL >

    SELECT
        argMaxState(
            CASE 
                WHEN flaggedAt IS NULL 
                THEN toDateTime('1970-01-01 00:00:00') 
                ELSE flaggedAt 
            END,
            timestamp
        )
    FROM posts  

Here, a CASE expression is used to convert any null flaggedAt values into the default datetime 1970-01-01 00:00:00. This ensures that during the merge, the aggregation logic processes these explicit default values rather than implicitly “overriding” nulls.

Since 1970-01-01 00:00:00 is used as a default placeholder, ensure that any downstream logic differentiates between genuine datetime values and these default values.

Next steps

Updated