Connect MongoDB to Tinybird

In this guide, you'll learn how to ingest data into Tinybird from MongoDB.

You'll use:

  • MongoDB Atlas as the source MongoDB database.
  • Confluent Cloud's MongoDB Atlas Source connector to capture change events from MongoDB Atlas and push to Kafka
  • Tinybird Confluent Cloud connector to ingest the data from Kafka

This guide uses Confluent Cloud as a managed Kafka service, and MongoDB Atlas as a managed MongoDB service. You can use any Kafka service and MongoDB instance, but the setup steps may vary.

Prerequisites

This guide assumes you have:

  • An existing Tinybird account & Workspace
  • An existing Confluent Cloud account
  • An existing MongoDB Atlas account & collection

1. Create Confluent Cloud MongoDB Atlas Source

Create a new MongoDB Atlas Source in Confluent Cloud. Use the following template to configure the Source:

{
  "name": "<CONNECTOR NAME>",
  "config": {
    "name": "<CONNECTOR NAME>",

    "connection.host": "<MONGO HOST>",
    "connection.user": "<MONGO USER>",
    "connection.password": "<MONGO PASS>",
    "database": "<MONGO DATABASE>",
    "collection": "<MONGO COLLECTION>",
    
    "cloud.provider": "<CLOUD PROVIDER>",
    "cloud.environment": "<CLOUD ENV>",
    "kafka.region": "<KAFKA REGION>",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<KAFKA KEY>",
    "kafka.api.secret": "<KAFKA SECRET>",
    "kafka.endpoint": "<KAFKA ENDPOINT>",
    
    "topic.prefix": "<KAFKA TOPIC PREFIX>",
    "errors.deadletterqueue.topic.name": "<KAFKA DEADLETTER TOPIC>",
    
    "startup.mode": "copy_existing",
    "copy.existing": "true",
    "copy.existing.max.threads": "1",
    "copy.existing.queue.size": "16000",

    "poll.await.time.ms": "5000",
    "poll.max.batch.size": "1000",
    "heartbeat.interval.ms": "10000",
    "errors.tolerance": "all",
    "max.batch.size": "100",

    "connector.class": "MongoDbAtlasSource",
    "output.data.format": "JSON",
    "output.json.format": "SimplifiedJson",
    "json.output.decimal.format": "NUMERIC",
    "change.stream.full.document": "updateLookup",
    "change.stream.full.document.before.change": "whenAvailable",
    "tasks.max": "1"
  }
}

When the Source is created, you should see a new Kafka topic in your Confluent Cloud account. This topic will contain the change events from your MongoDB collection.

2. Create Tinybird Data Source (CLI)

Using the Tinybird CLI, create a new Kafka connection

tb connection create kafka

The CLI will prompt you to enter the connection details to your Kafka service. You'll also provide a name for the connection, which is used by Tinybird to reference the connection, and you'll need it below.

Next, create a new file called kafka_ds.datasource (you can use any name you want, just use the .datasource extension). Add the following content to the file:

SCHEMA >
    `_id` String `json:$.documentKey._id` DEFAULT JSONExtractString(__value, '_id._id'),
    `operation_type` LowCardinality(String) `json:$.operationType`,
    `database` LowCardinality(String) `json:$.ns.db`,
    `collection` LowCardinality(String) `json:$.ns.coll`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(__timestamp)"
ENGINE_SORTING_KEY "__timestamp, _id"

KAFKA_CONNECTION_NAME '<CONNECTION NAME>'
KAFKA_TOPIC '<KAFKA TOPIC>'
KAFKA_GROUP_ID '<KAFKA CONSUMER GROUP ID>'
KAFKA_AUTO_OFFSET_RESET 'earliest'
KAFKA_STORE_RAW_VALUE 'True'
KAFKA_STORE_HEADERS 'False'
KAFKA_STORE_BINARY_HEADERS 'True'
KAFKA_TARGET_PARTITIONS 'auto'
KAFKA_KEY_AVRO_DESERIALIZATION ''

Now push the Data Source to Tinybird using:

tb push kafka_ds.datasource

3. Validate the Data Source

Go to the Tinybird UI and validate that a Data Source has been created.

As changes occur in MongoDB, you should see the data being ingested into Tinybird. Note that this is an append log of all changes, so you will see multiple records for the same document as it is updated.

4. Deduplicate with ReplacingMergeTree

To deduplicate the data, you can use a ReplacingMergeTree engine on a Materialized View. This is explained in more detail in the deduplication guide.

We will create a new Data Source using the ReplacingMergeTree engine to store the deduplicated data, and a Pipe to process the data from the original Data Source and write to the new Data Source.

First, create a new Data Source to store the deduplicated data.

Create a new file called deduped_ds.datasource and add the following content:

SCHEMA >
    `fullDocument` String,
    `_id` String,
    `database` LowCardinality(String),
    `collection` LowCardinality(String),
    `k_timestamp` DateTime,
    `is_deleted` UInt8

ENGINE "ReplacingMergeTree"
ENGINE_SORTING_KEY "_id"
ENGINE_VER "k_timestamp"
ENGINE_IS_DELETED "is_deleted"

Now push the Data Source to Tinybird using:

tb push deduped_ds.datasource

Then, create a new file called dedupe_mongo.pipe and add the following content:

NODE mv
SQL >

    SELECT
        JSONExtractRaw(__value, 'fullDocument') as fullDocument,
        _id,
        database,
        collection,
        __timestamp as k_timestamp,
        if(operation_type = 'delete', 1, 0) as is_deleted
    FROM <ORIGINAL DATASOURCE NAME>

TYPE materialized
DATASOURCE <DESTINATION DATASOURCE NAME>

Now push the Pipe to Tinybird using:

tb push dedupe_mongo.pipe

As new data arrives via Kafka, it will be processed automatically through the Materialized View, writing it into the ReplacingMergeTree Data Source.

Query this new Data Source to access the deduplicated data:

SELECT * FROM deduped_ds FINAL
Updated