Deduplicate the data in your Data Source

OLAP databases like ClickHouse are optimized for fast ingestion and, for that to work, some trade-offs are made. One of these is the lack of unique data constraints: Enforcing them would add a big overhead, and make ingestion speeds too slow for what's expected from this kind of database.

However, there will be times when you either want to get rid of duplicated data, or just access the most recent data point available. There are several options, but the general workflow is ingesting all of the data first (including duplicates) and dealing with them after ingestion.

The problem

Typically, there are two scenarios that lead to duplicated data:

  • Upserts. An upsert is an operation that inserts new rows into a database table if they do not already exist, or updates them if they do. With databases like Postgres, this can be accomplished with ON CONFLICT DO INSERT clauses. Because ClickHouse does not enforce uniqueness of primary keys, it doesn't support clauses like this. The way to do upserts on ClickHouse is with a ReplacingMergeTree engine. More on this later.
  • Constant ingestions and historical data. Imagine you're periodically dumping data from your transactional database to ClickHouse to run analytical queries on it in real-time, as described in this Tinybird blog post. You would end up with lots of rows inserted at different times with the same primary key. You may want to get only the latest data point - in this case you can get rid of the rest and treat them like upserts. But there are also use cases where you want to keep all the data to have a historic record of the evolution of the attributes of an object over time.

Different alternatives based on your requirements

  • Deduplicate at Query time: if you are still prototyping or the Data Source is not too big, like below 1M rows, probably it is still fast to deduplicate at query time.
  • Engine based: ReplacingMergeTree or AggregatingMergeTree and query with FINAL is the easiest way to deduplicate. Downsides: not being able to use Aggregation MVs from these Data Sources. Usually the fastest and more efficient way to deduplicate.
  • Snapshot based: if data freshness is not a hard requirement, you can generate periodic snapshots of the data and also take advantage of subsequent Materialized Views for rollups.
  • Hybrid approach: when you need to overcome Engine approach limitations but freshness is also needed you can combine both approaches in a Lambda Architecture.

Note: for dimensional and small tables, probably a periodical full replace is the best alternative.

Example scenario

For this guide, imagine a dataset of a social media analytics company that wants to track some data -views, likes, tags— of content over time.

You receive an event with the latest info for each post, identified by post_id. The three fields views, likes, tags will vary from event to event.

post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }

If you want to follow along, you can define the following .datasource file locally (but you can also just read through to understand the different options). This file would create the schema to store the data, where every day total views for each post are appended:

posts_info.datasource
DESCRIPTION >
    Datasource to save post info

SCHEMA >
    `post_id` Int32 `json:$.post_id`,
    `views` Int32 `json:$.views`,
    `likes` Int32 `json:$.likes`,
    `tag` String `json:$.tag`,
    `timestamp` DateTime `json:$.timestamp`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYear(timestamp)"
ENGINE_SORTING_KEY "post_id, timestamp"

Selecting the right sorting key matters. If you're going to do lots of filtering by post_id, to keep the scanned index range as small as possible it's better to sort first by post_id and then by timestamp. Read this resource or take Tinybird's free 'Principles of Real Time Analytics' course to learn how to better define and sort your indexes.

You can create a Tinybird Data Source by adding this definition file ^ to your Workspace and append mock data using this data generator in Mockingbird. Just remember to set your Host and a token with the needed rights.

Deduplicate at query time

Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id and get the latest value with these strategies:

  • Get the max date for each post in a subquery and then filter by its results.
  • Group data by post_id and use the argMax function.
  • Use the LIMIT BY clause.

Select the Subquery, argMax, or the LIMIT BY links below to see the example queries.

Deduplicating data on post_id using Subquery
SELECT *
FROM posts_info 
WHERE (post_id, timestamp) IN 
(
    SELECT 
        post_id, 
        max(timestamp)
    FROM posts_info 
    GROUP BY post_id
)

Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach will be faster than others.

In general, deduplicating at query time is fine if the size of your data is small. But if you have lots of data, the best option to make your query faster is to use a specific Engine that will take care of deduplication for you.

Use a ReplacingMergeTree Engine

If you have lots of data and you only care about the latest insertion for each unique key, you can use a ReplacingMergeTree engine. You need to use these two engine options to use ReplacingMergeTree engines: ENGINE_SORTING_KEY and ENGINE_VER.

  • Rows with the same ENGINE_SORTING_KEY will be deduplicated. You can select one or more columns.
  • ENGINE_VER can be omitted and in that case the last inserted row for each unique ENGINE_SORTING_KEY will be kept. If you specify it (the type of ENGINE_VER has to be UInt*, Date or DateTime), the row with the highest ENGINE_VER for each unique ENGINE_SORTING_KEY will be kept.

Define a Data Source like this:

post_views_rmt.datasource
DESCRIPTION >
    Datasource to save post info. ReplacingMergeTree Engine.

SCHEMA >
    `post_id` Int32 `json:$.post_id`,
    `views` Int32 `json:$.views`,
    `likes` Int32 `json:$.likes`,
    `tag` String `json:$.tag`,
    `timestamp` DateTime `json:$.timestamp`

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "post_id"
ENGINE_VER "timestamp"

You can create the new Data Source by adding this new definition file to your project, with the same methods described above.

An important note from the ClickHouse docs:

"Data deduplication occurs only during a merge. Merging occurs in the background at an unknown time, so you can’t plan for it. Some of the data may remain unprocessed. Although you can run an unscheduled merge using the OPTIMIZE query, do not count on using it, because the OPTIMIZE query will read and write a large amount of data. Thus, ReplacingMergeTree is suitable for clearing out duplicate data in the background in order to save space, but it does not guarantee the absence of duplicates."

Merging will happen in the background, most likely every 9-10 minutes, but if ClickHouse considers that you do not have enough data it will not happen as frequently quickly. So doing SELECT * FROM ... will most likely give you duplicated rows (even when using a ReplacingMergeTree engine) after an insertion, and you'll need to use an additional strategy to select only the last value at query time.

Easiest way to achieve this deduplication is adding the FINAL to the query here, using the FINAL statement.

Deduplicating data on post_id using FINAL
SELECT *
FROM posts_info_rmt FINAL

You can define the posts_info_rmt as the landing Data Source —the one you send events to— or as a Materialized View from posts_info.
And, following the MV path, it is possible to create a Data Source with an AggregatingMergeTree Engine using maxState(ts) and argMaxState(field,ts).

Snapshots

Sometimes you need other Sorting Keys that will change with updates, or need to do rollups and want to use Materialized Views, or simply response times are too big with a ReplacingMergeTree. Luckily you have Copy Pipes that can take a query result and write it to a new Data Source.

post_generate_snapshot.pipe
NODE gen_snapshot
SQL >

    SELECT
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag,
        max(timestamp) as ts,
        toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts
    FROM posts_info
    WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE
    GROUP BY post_id

TYPE copy
TARGET_DATASOURCE post_snapshot
COPY_MODE replace
COPY_SCHEDULE 0 * * * *

Note the WHERE clause and the snapshot_ts to stick to Best Practices.

Here for example we can use tag as Sorting Key, since TARGET_DATASOURCE of the Copy Pipe can be a regular MergeTree.

post_snapshot.datasource
SCHEMA >
    `post_id` Int32,
    `views` Int32,
    `likes` Int32,
    `tag` String,
    `ts` DateTime,
    `snapshot_ts` DateTime

ENGINE "MergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "tag, post_id"

Hybrid approach, Lambda architecture

Snapshots are great but it is true that you are sacrificing a bit of data freshness —or, if we run Copy Pipes too frequently they can be more expensive than MVs— so sometimes it is needed to combine batch and realtime processing. Reading the latest snapshot and incorporating the changes that happened since then.

This pattern is outlined in the Lambda Architecture guide, and you can see a practical example in the CDC using Lambda guide.

Using the post_snapshot Data Source created before, the realtime pipe would be something like:

latest_values.pipe
NODE get_latest_changes
SQL >

    SELECT 
        max(timestamp) last_ts,
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag
    FROM posts_info_rmt
    WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot)
    GROUP BY post_id

NODE get_snapshot
SQL >

    SELECT 
        last_ts,
        post_id, 
        views,
        likes,
        tag
    FROM posts_info_rmt
    WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot)
    AND post_id NOT IN (SELECT post_id FROM get_latest_changes)


NODE combine_both
SQL >

    SELECT * FROM get_snapshot
    UNION ALL
    SELECT * FROM get_latest_changes

Next steps