Tinybird Forward is live! See the Tinybird Forward documentation to learn more.
Deduplicate data in your data source¶
Sometimes you might need to deduplicate data, for example to receive updates or data from a transactional database through CDC. You might want to retrieve only the latest data point, or keep a historic record of the evolution of the attributes of an object over time.
Because Tinybird doesn't enforce uniqueness for primary keys when inserting rows, you need to follow different strategies to deduplicate data with minimal side effects.
Deduplication strategies¶
You can use one of the following strategies to deduplicate your data.
Method | When to use |
---|---|
Deduplicate at query time | Deduplicate data at query time if you are still prototyping or the data source is small. |
Use ReplacingMergeTree | Use ReplacingMergeTree or AggregatingMergeTree for greater performance. |
Snapshot based deduplication | If data freshness isn't required, generate periodic snapshots of the data and take advantage of subsequent materialized views for rollups. |
Hybrid approach using Lambda architecture | When you need to overcome engine approach limitations while preserving freshness, combine approaches in a Lambda architecture. |
For dimensional and small tables, a periodical full replace is usually the best option.
Example case¶
Consider a dataset from a social media analytics company that wants to track some data content over time. You receive an event with the latest info for each post, identified by post_id
. The three fields, views
, likes
, tags
, vary from event to event. For example:
post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }
Deduplicate at query time¶
Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id
and get the latest value with these strategies:
- Get the max date for each post in a subquery and then filter by its results.
- Group data by
post_id
and use theargMax
function. - Use the
LIMIT BY
clause.
Select Subquery
, argMax
, or LIMIT BY
to see the example queries for each.
Deduplicating data on post_id using Subquery
SELECT * FROM posts_info WHERE (post_id, timestamp) IN ( SELECT post_id, max(timestamp) FROM posts_info GROUP BY post_id )
Depending on your data and how you define the sorting keys in your data sources to store it on disk, one approach is faster than the others.
In general, deduplicating at query time is fine if the size of your data is small. If you have lots of data, use a specific Engine that takes care of deduplication for you.
Use the ReplacingMergeTree engine¶
If you've lots of data and you're interested in the latest insertion for each unique key, use the ReplacingMergeTree engine with the following options: ENGINE_SORTING_KEY
, ENGINE_VER
, and ENGINE_IS_DELETED
.
- Rows with the same
ENGINE_SORTING_KEY
are deduplicated. You can select one or more columns. - If you specify a type for
ENGINE_VER
, the row with the highestENGINE_VER
for each uniqueENGINE_SORTING_KEY
is kept, for example a timestamp. ENGINE_IS_DELETED
is only active if you useENGINE_VER
. This column determines whether the row represents the state or is to be deleted;1
is a deleted row,0
is a state row. The type must beUInt8
.- You can omit
ENGINE_VER
, so that the last inserted row for each uniqueENGINE_SORTING_KEY
is kept.
Aggregation or rollups in materialized views built on top of ReplacingMergeTree queries always contain duplicated data.
Define a data source¶
Define a data source like the following:
post_views_rmt.datasource
DESCRIPTION > data source to save post info. ReplacingMergeTree Engine. SCHEMA > `post_id` Int32 `json:$.post_id`, `views` Int32 `json:$.views`, `likes` Int32 `json:$.likes`, `tag` String `json:$.tag`, `timestamp` DateTime `json:$.timestamp`, `_is_deleted` UInt8 `json:$._is_deleted` ENGINE "ReplacingMergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "post_id" ENGINE_VER "timestamp" ENGINE_IS_DELETED "_is_deleted"
ReplacingMergeTree deduplicates during a merge, and merges can't be controlled. Consider adding the FINAL
clause, or an alternative deduplication method, to apply the merge at query time. Note also that rows are masked, not removed, when using FINAL
.
Deduplicating data on post_id using FINAL
SELECT * FROM posts_info_rmt FINAL
You can define the posts_info_rmt
as the landing data source, the one you send events to, or as a materialized view from posts_info
. You can also create a data source with an AggregatingMergeTree
Engine using maxState(ts)
and argMaxState(field,ts)
.
Snapshot based deduplication¶
Use Copy Pipes to take a query result and write it to a new data source in the following situations:
- You need other Sorting Keys that might change with updates.
- You need to do rollups and want to use materialized views.
- Response times are too long with a
ReplacingMergeTree
.
The following is an example snapshot:
post_generate_snapshot.pipe
NODE gen_snapshot SQL > SELECT post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag, max(timestamp) as ts, toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts FROM posts_info WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE GROUP BY post_id TYPE copy TARGET_DATASOURCE post_snapshot COPY_MODE replace COPY_SCHEDULE 0 * * * *
Because the TARGET_DATASOURCE
engine is a MergeTree, you can use fields that you expect to be updated as sorting keys in the ReplacingMergeTree.
post_snapshot.datasource
SCHEMA > `post_id` Int32, `views` Int32, `likes` Int32, `tag` String, `ts` DateTime, `snapshot_ts` DateTime ENGINE "MergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "tag, post_id"
Hybrid approach using Lambda architecture¶
Snapshots might decrease data freshness, and running Copy Pipes too frequently might be more expensive than materialized views. A way to mitigate these issues is to combine batch and real-time processing, reading the latest snapshot and incorporating the changes that happened since then.
This pattern is described in the Lambda architecture guide. See a practical example in the CDC using Lambda guide.
Using the post_snapshot
data source created before, the real-time Pipe would be like the following:
latest_values.pipe
NODE get_latest_changes SQL > SELECT max(timestamp) last_ts, post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag FROM posts_info_rmt WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot) GROUP BY post_id NODE get_snapshot SQL > SELECT last_ts, post_id, views, likes, tag FROM posts_info_rmt WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot) AND post_id NOT IN (SELECT post_id FROM get_latest_changes) NODE combine_both SQL > SELECT * FROM get_snapshot UNION ALL SELECT * FROM get_latest_changes
Using argMax with null values¶
Here are the definitions for argMax
functions:
argMaxState
: used to pipe a constantly updating max state into a materialized view.argMaxMerge
: used to query a max state value out of a materialized view.
When dealing with null values, argMax
functions might not behave as you expect. In raw queries, you might see a null value for the most recent record. However, when data is piped into an AggregatingMergeTree materialized view using argMaxState
and later queried using argMaxMerge
, the result can be different.
Returning to the social media analytics example, imagine you want to track most recent time when a post was flagged.
The raw query would be as follows:
get_latest_flagged_at_raw.pipe
# # This returns `null` if the most recent record's `flaggedAt` value is null. # NODE get_latest_flagged_at_raw SQL > SELECT flaggedAt FROM posts WHERE post_id = 'abc123' ORDER BY timestamp DESC LIMIT 1
First, data is aggregated into a materialized view with a pipe that uses argMaxState
:
get_latest_flagged_at_mat.pipe
NODE get_latest_flagged_at SQL > SELECT argMaxState(flaggedAt, timestamp) FROM posts
Later, the aggregated state is merged with argMaxMerge
when the materialized view is queried:
get_latest_flagged_at_mat.pipe
NODE get_absolute_latest_flagged_at SQL > SELECT argMaxMerge(flaggedAt) FROM post_analytics_mv WHERE post_id = 'abc123'
Although you might expect a null
result because the raw query returned null
, the merging process prefers any non‑null value over a null value—even if its associated timestamp is lower. The result is the most recent non‑null flaggedAt
value.
Why this happens and workaround¶
During the merge process, the materialized view combines key‑value candidate states. A key-value candidate in this example would be timestamp
and flaggedAt
. If one candidate has a non‑null flaggedAt
and another has a null value for flaggedAt
, the non‑null value “wins” regardless of its timestamp. This behavior is inherent to the merging logic in argMaxMerge
. It explains why in an argMaxMerge query, you might not get the absolute max.
To prevent the merging behavior from overriding a null with a non‑null candidate, you need to handle null values explicitly before they enter the materialized view. One common approach is to transform null values to a known default—often the Unix epoch 1970-01-01 00:00:00
. However, be aware that such conversions produce “fake” values which must be recognized in subsequent processing.
Assume your raw data includes a flaggedAt column, which may contain nulls. You can pre-process the data during aggregation as follows:
get_latest_flagged_at_no_nulls.pipe
NODE get_latest_flagged_at_no_nulls SQL > SELECT argMaxState( CASE WHEN flaggedAt IS NULL THEN toDateTime('1970-01-01 00:00:00') ELSE flaggedAt END, timestamp ) FROM posts
Here, a CASE expression is used to convert any null flaggedAt values into the default datetime 1970-01-01 00:00:00. This ensures that during the merge, the aggregation logic processes these explicit default values rather than implicitly “overriding” nulls.
Since 1970-01-01 00:00:00
is used as a default placeholder, ensure that any downstream logic differentiates between genuine datetime values and these default values.
Next steps¶
- Read the materialized views docs.
- Read the Lambda architecture guide.
- Visualize your data using Tinybird Charts.