Deduplicate data in your Data Source¶
Sometimes you might need to deduplicate data, for example to receive updates or data from a transactional database through CDC. You might want to retrieve only the latest data point, or keep a historic record of the evolution of the attributes of an object over time.
Because ClickHouse® doesn't enforce uniqueness for primary keys when inserting rows, you need to follow different strategies to deduplicate data with minimal side effects.
Deduplication strategies¶
You can use one of the following strategies to deduplicate your data.
Method | When to use |
---|---|
Deduplicate at query time | Deduplicate data at query time if you are still prototyping or the Data Source is small. |
Use ReplacingMergeTree | Use ReplacingMergeTree or AggregatingMergeTree for greater performance. |
Snapshot based deduplication | If data freshness isn't required, generate periodic snapshots of the data and take advantage of subsequent Materialized Views for rollups. |
Hybrid approach using Lambda architecture | When you need to overcome engine approach limitations while preserving freshness, combine approaches in a Lambda architecture. |
For dimensional and small tables, a periodical full replace is usually the best option.
Example case¶
Consider a dataset from a social media analytics company that wants to track some data content over time. You receive an event with the latest info for each post, identified by post_id
. The three fields, views
, likes
, tags
, vary from event to event. For example:
post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }
Deduplicate at query time¶
Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id
and get the latest value with these strategies:
- Get the max date for each post in a subquery and then filter by its results.
- Group data by
post_id
and use theargMax
function. - Use the
LIMIT BY
clause.
Select Subquery
, argMax
, or LIMIT BY
to see the example queries for each.
Deduplicating data on post_id using Subquery
SELECT * FROM posts_info WHERE (post_id, timestamp) IN ( SELECT post_id, max(timestamp) FROM posts_info GROUP BY post_id )
Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach is faster than the others.
In general, deduplicating at query time is fine if the size of your data is small. If you have lots of data, use a specific Engine that takes care of deduplication for you.
Use the ReplacingMergeTree engine¶
If you've lots of data and you're interested in the latest insertion for each unique key, use the ReplacingMergeTree engine with the following options: ENGINE_SORTING_KEY
, ENGINE_VER
, and ENGINE_IS_DELETED
.
- Rows with the same
ENGINE_SORTING_KEY
are deduplicated. You can select one or more columns. - If you specify a type for
ENGINE_VER
, the row with the highestENGINE_VER
for each uniqueENGINE_SORTING_KEY
is kept, for example a timestamp. ENGINE_IS_DELETED
is only active if you useENGINE_VER
. This column determines whether the row represents the state or is to be deleted;1
is a deleted row,0
is a state row. The type must beUInt8
.- You can omit
ENGINE_VER
, so that the last inserted row for each uniqueENGINE_SORTING_KEY
is kept.
Aggregation or rollups in Materialized Views built on top of ReplacingMergeTree queries always contain duplicated data.
Define a Data Source¶
Define a Data Source like the following:
post_views_rmt.datasource
DESCRIPTION > Data Source to save post info. ReplacingMergeTree Engine. SCHEMA > `post_id` Int32 `json:$.post_id`, `views` Int32 `json:$.views`, `likes` Int32 `json:$.likes`, `tag` String `json:$.tag`, `timestamp` DateTime `json:$.timestamp`, `_is_deleted` UInt8 `json:$._is_deleted` ENGINE "ReplacingMergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "post_id" ENGINE_VER "timestamp" ENGINE_IS_DELETED "_is_deleted"
ReplacingMergeTree deduplicates during a merge, and merges can't be controlled. Consider adding the FINAL
clause, or an alternative deduplication method, to apply the merge at query time. Note also that rows are masked, not removed, when using FINAL
.
Deduplicating data on post_id using FINAL
SELECT * FROM posts_info_rmt FINAL
You can define the posts_info_rmt
as the landing Data Source, the one you send events to, or as a Materialized View from posts_info
. You can also create a Data Source with an AggregatingMergeTree
Engine using maxState(ts)
and argMaxState(field,ts)
.
Snapshot based deduplication¶
Use Copy Pipes to take a query result and write it to a new Data Source in the following situations:
- You need other Sorting Keys that might change with updates.
- You need to do rollups and want to use Materialized Views.
- Response times are too long with a
ReplacingMergeTree
.
The following is an example snapshot:
post_generate_snapshot.pipe
NODE gen_snapshot SQL > SELECT post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag, max(timestamp) as ts, toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts FROM posts_info WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE GROUP BY post_id TYPE copy TARGET_DATASOURCE post_snapshot COPY_MODE replace COPY_SCHEDULE 0 * * * *
Because the TARGET_DATASOURCE
engine is a MergeTree, you can use fields that you expect to be updated as sorting keys in the ReplacingMergeTree.
post_snapshot.datasource
SCHEMA > `post_id` Int32, `views` Int32, `likes` Int32, `tag` String, `ts` DateTime, `snapshot_ts` DateTime ENGINE "MergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "tag, post_id"
Hybrid approach using Lambda architecture¶
Snapshots might decrease data freshness, and running Copy Pipes too frequently might be more expensive than Materialized Views. A way to mitigate these issues is to combine batch and real-time processing, reading the latest snapshot and incorporating the changes that happened since then.
This pattern is described in the Lambda architecture guide. See a practical example in the CDC using Lambda guide.
Using the post_snapshot
Data Source created before, the real-time Pipe would be like the following:
latest_values.pipe
NODE get_latest_changes SQL > SELECT max(timestamp) last_ts, post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag FROM posts_info_rmt WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot) GROUP BY post_id NODE get_snapshot SQL > SELECT last_ts, post_id, views, likes, tag FROM posts_info_rmt WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot) AND post_id NOT IN (SELECT post_id FROM get_latest_changes) NODE combine_both SQL > SELECT * FROM get_snapshot UNION ALL SELECT * FROM get_latest_changes
Next steps¶
- Read the Materialized Views docs.
- Read the Lambda architecture guide.
- Visualize your data using Tinybird Charts.