Connect MongoDB to Tinybird¶
In this guide, you'll learn how to ingest data into Tinybird from MongoDB.
You'll use:
- MongoDB Atlas as the source MongoDB database.
- Confluent Cloud's MongoDB Atlas Source connector to capture change events from MongoDB Atlas and push to Kafka
- Tinybird Confluent Cloud connector to ingest the data from Kafka
This guide uses Confluent Cloud as a managed Kafka service, and MongoDB Atlas as a managed MongoDB service. You can use any Kafka service and MongoDB instance, but the setup steps may vary.
Prerequisites¶
This guide assumes you have:
- An existing Tinybird account & Workspace
- An existing Confluent Cloud account
- An existing MongoDB Atlas account & collection
1. Create Confluent Cloud MongoDB Atlas Source¶
Create a new MongoDB Atlas Source in Confluent Cloud. Use the following template to configure the Source:
{ "name": "<CONNECTOR NAME>", "config": { "name": "<CONNECTOR NAME>", "connection.host": "<MONGO HOST>", "connection.user": "<MONGO USER>", "connection.password": "<MONGO PASS>", "database": "<MONGO DATABASE>", "collection": "<MONGO COLLECTION>", "cloud.provider": "<CLOUD PROVIDER>", "cloud.environment": "<CLOUD ENV>", "kafka.region": "<KAFKA REGION>", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "<KAFKA KEY>", "kafka.api.secret": "<KAFKA SECRET>", "kafka.endpoint": "<KAFKA ENDPOINT>", "topic.prefix": "<KAFKA TOPIC PREFIX>", "errors.deadletterqueue.topic.name": "<KAFKA DEADLETTER TOPIC>", "startup.mode": "copy_existing", "copy.existing": "true", "copy.existing.max.threads": "1", "copy.existing.queue.size": "16000", "poll.await.time.ms": "5000", "poll.max.batch.size": "1000", "heartbeat.interval.ms": "10000", "errors.tolerance": "all", "max.batch.size": "100", "connector.class": "MongoDbAtlasSource", "output.data.format": "JSON", "output.json.format": "SimplifiedJson", "json.output.decimal.format": "NUMERIC", "change.stream.full.document": "updateLookup", "change.stream.full.document.before.change": "whenAvailable", "tasks.max": "1" } }
When the Source is created, you should see a new Kafka topic in your Confluent Cloud account. This topic will contain the change events from your MongoDB collection.
2. Create Tinybird Data Source (CLI)¶
Using the Tinybird CLI, create a new Kafka connection
tb connection create kafka
The CLI will prompt you to enter the connection details to your Kafka service. You'll also provide a name for the connection, which is used by Tinybird to reference the connection, and you'll need it below.
Next, create a new file called kafka_ds.datasource
(you can use any name you want, just use the .datasource extension). Add the following content to the file:
SCHEMA > `_id` String `json:$.documentKey._id` DEFAULT JSONExtractString(__value, '_id._id'), `operation_type` LowCardinality(String) `json:$.operationType`, `database` LowCardinality(String) `json:$.ns.db`, `collection` LowCardinality(String) `json:$.ns.coll` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(__timestamp)" ENGINE_SORTING_KEY "__timestamp, _id" KAFKA_CONNECTION_NAME '<CONNECTION NAME>' KAFKA_TOPIC '<KAFKA TOPIC>' KAFKA_GROUP_ID '<KAFKA CONSUMER GROUP ID>' KAFKA_AUTO_OFFSET_RESET 'earliest' KAFKA_STORE_RAW_VALUE 'True' KAFKA_STORE_HEADERS 'False' KAFKA_STORE_BINARY_HEADERS 'True' KAFKA_TARGET_PARTITIONS 'auto' KAFKA_KEY_AVRO_DESERIALIZATION ''
Now push the Data Source to Tinybird using:
tb push kafka_ds.datasource
3. Validate the Data Source¶
Go to the Tinybird UI and validate that a Data Source has been created.
As changes occur in MongoDB, you should see the data being ingested into Tinybird. Note that this is an append log of all changes, so you will see multiple records for the same document as it is updated.
4. Deduplicate with ReplacingMergeTree¶
To deduplicate the data, you can use a ReplacingMergeTree
engine on a Materialized View. This is explained in more detail in the deduplication guide.
We will create a new Data Source using the ReplacingMergeTree engine to store the deduplicated data, and a Pipe to process the data from the original Data Source and write to the new Data Source.
First, create a new Data Source to store the deduplicated data.
Create a new file called deduped_ds.datasource
and add the following content:
SCHEMA > `fullDocument` String, `_id` String, `database` LowCardinality(String), `collection` LowCardinality(String), `k_timestamp` DateTime, `is_deleted` UInt8 ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "_id" ENGINE_VER "k_timestamp" ENGINE_IS_DELETED "is_deleted"
Now push the Data Source to Tinybird using:
tb push deduped_ds.datasource
Then, create a new file called dedupe_mongo.pipe
and add the following content:
NODE mv SQL > SELECT JSONExtractRaw(__value, 'fullDocument') as fullDocument, _id, database, collection, __timestamp as k_timestamp, if(operation_type = 'delete', 1, 0) as is_deleted FROM <ORIGINAL DATASOURCE NAME> TYPE materialized DATASOURCE <DESTINATION DATASOURCE NAME>
Now push the Pipe to Tinybird using:
tb push dedupe_mongo.pipe
As new data arrives via Kafka, it will be processed automatically through the Materialized View, writing it into the ReplacingMergeTree
Data Source.
Query this new Data Source to access the deduplicated data:
SELECT * FROM deduped_ds FINAL