Connect MongoDB to Tinybird

In this guide, you'll learn how to ingest data into Tinybird from MongoDB.

You'll use:

  • MongoDB Atlas as the source MongoDB database.
  • Confluent Cloud's MongoDB Atlas Source connector to capture change events from MongoDB Atlas and push to Kafka
  • Tinybird Confluent Cloud connector to ingest the data from Kafka

This guide uses Confluent Cloud as a managed Kafka service, and MongoDB Atlas as a managed MongoDB service. You can use any Kafka service and MongoDB instance, but the setup steps may vary.

Prerequisites

This guide assumes you have:

  • An existing Tinybird account & workspace
  • An existing Confluent Cloud account
  • An existing MongoDB Atlas account & collection

1. Create a Confluent Cloud source

Create a new MongoDB Atlas Source in Confluent Cloud. Use the following template to configure the Source:

{
  "name": "<CONNECTOR NAME>",
  "config": {
    "name": "<CONNECTOR NAME>",

    "connection.host": "<MONGO HOST>",
    "connection.user": "<MONGO USER>",
    "connection.password": "<MONGO PASS>",
    "database": "<MONGO DATABASE>",
    "collection": "<MONGO COLLECTION>",
    
    "cloud.provider": "<CLOUD PROVIDER>",
    "cloud.environment": "<CLOUD ENV>",
    "kafka.region": "<KAFKA REGION>",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<KAFKA KEY>",
    "kafka.api.secret": "<KAFKA SECRET>",
    "kafka.endpoint": "<KAFKA ENDPOINT>",
    
    "topic.prefix": "<KAFKA TOPIC PREFIX>",
    "errors.deadletterqueue.topic.name": "<KAFKA DEADLETTER TOPIC>",
    
    "startup.mode": "copy_existing",
    "copy.existing": "true",
    "copy.existing.max.threads": "1",
    "copy.existing.queue.size": "16000",

    "poll.await.time.ms": "5000",
    "poll.max.batch.size": "1000",
    "heartbeat.interval.ms": "10000",
    "errors.tolerance": "all",
    "max.batch.size": "100",

    "connector.class": "MongoDbAtlasSource",
    "output.data.format": "JSON",
    "output.json.format": "SimplifiedJson",
    "json.output.decimal.format": "NUMERIC",
    "change.stream.full.document": "updateLookup",
    "change.stream.full.document.before.change": "whenAvailable",
    "tasks.max": "1"
  }
}

When the source is created, you should see a new Kafka topic in your Confluent Cloud account. This topic will contain the change events from your MongoDB collection.

2. Create Tinybird data source

Create a new Kafka data source in your Tinybird project. See Kafka for more information.

The data source should have the following schema:

SCHEMA >
    `_id` String `json:$.documentKey._id` DEFAULT JSONExtractString(__value, '_id._id'),
    `operation_type` LowCardinality(String) `json:$.operationType`,
    `database` LowCardinality(String) `json:$.ns.db`,
    `collection` LowCardinality(String) `json:$.ns.coll`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(__timestamp)"
ENGINE_SORTING_KEY "__timestamp, _id"

KAFKA_CONNECTION_NAME '<CONNECTION NAME>'
KAFKA_TOPIC '<KAFKA TOPIC>'
KAFKA_GROUP_ID '<KAFKA CONSUMER GROUP ID>'
KAFKA_AUTO_OFFSET_RESET 'earliest'
KAFKA_STORE_RAW_VALUE 'True'
KAFKA_STORE_HEADERS 'False'
KAFKA_STORE_BINARY_HEADERS 'True'
KAFKA_TARGET_PARTITIONS 'auto'
KAFKA_KEY_AVRO_DESERIALIZATION ''

Deploy the data source before going to the next step.

3. Validate the data source

Go to Tinybird Cloud and validate that a data source has been created.

As changes occur in MongoDB, you should see the data being ingested into Tinybird. Note that this is an append log of all changes, so you will see multiple records for the same document as it's updated.

4. Deduplicate with ReplacingMergeTree

Tinybird creates a new data source using the ReplacingMergeTree engine to store the deduplicated data, and a pipe to process the data from the original data source and write to the new data source.

First, create a new data source to store the deduplicated data.

Create a new file called deduped_ds.datasource and add the following content:

SCHEMA >
    `fullDocument` String,
    `_id` String,
    `database` LowCardinality(String),
    `collection` LowCardinality(String),
    `k_timestamp` DateTime,
    `is_deleted` UInt8

ENGINE "ReplacingMergeTree"
ENGINE_SORTING_KEY "_id"
ENGINE_VER "k_timestamp"
ENGINE_IS_DELETED "is_deleted"

Then, create a new file called dedupe_mongo.pipe and add the following content:

NODE mv
SQL >

    SELECT
        JSONExtractRaw(__value, 'fullDocument') as fullDocument,
        _id,
        database,
        collection,
        __timestamp as k_timestamp,
        if(operation_type = 'delete', 1, 0) as is_deleted
    FROM <ORIGINAL DATASOURCE NAME>

TYPE materialized
DATASOURCE <DESTINATION DATASOURCE NAME>

Deploy the changes. As new data arrives via Kafka, it's processed automatically through the materialized view, writing it into the ReplacingMergeTree data source.

Query this new data source to access the deduplicated data:

SELECT * FROM deduped_ds FINAL