Confluent Connector

Use the Confluent Connector to bring data from your existing Confluent Cloud cluster into Tinybird so that you can quickly turn them into high-concurrency, low-latency REST API Endpoints and query using SQL.

The Confluent Connector is fully managed and requires no additional tooling. Connect Tinybird to your Confluent Cloud cluster, select a topic, and Tinybird automatically begins consuming messages from Confluent Cloud.

Prerequisites

You need to grant READ permissions to both the Topic and the Consumer Group to ingest data from Confluent into Tinybird.

The Confluent Cloud Schema Registry is only supported for decoding Avro messages. When using Confluent Schema Registry, the Schema name must match the Topic name. For example, if you're ingesting the Kafka Topic my-kafka-topic using a Connector with Schema Registry enabled, it expects to find a Schema named my-kafka-topic-value.

Create the Data Source using the UI

To connect Tinybird to your Confluent Cloud cluster, select Create new (+) next to the data project section, select Data Source, and then select Confluent from the list of available Data Sources.

Enter the following details:

  • Connection name: A name for the Confluent Cloud connection in Tinybird.
  • Bootstrap Server: The comma-separated list of bootstrap servers, including port numbers.
  • Key: The key component of the Confluent Cloud API Key.
  • Secret: The secret component of the Confluent Cloud API Key.
  • Decode Avro messages with schema registry: (Optional) Turn on Schema Registry support to decode Avro messages. Enter the Schema Registry URL, username, and password.

After you've entered the details, select Connect. This creates the connection between Tinybird and Confluent Cloud. A list of your existing topics appears and you can select the topic to consume from.

Tinybird creates a Group ID that specifies the name of the consumer group that this Kafka consumer belongs to. You can customize the Group ID, but ensure that your Group ID has Read permissions to the topic.

After you've chosen a topic, you can select the starting offset to consume from. You can consume from the earliest offset or the latest offset:

  • If you consume from the earliest offset, Tinybird consumes all messages from the beginning of the topic.
  • If you consume from the latest offset, Tinybird only consumes messages that are produced after the connection is created.

After selecting the offset, select Next. Tinybird consumes a sample of messages from the topic and displays the schema. You can adjust the schema and Data Source settings as needed, then select Create Data Source.

Tinybird begins consuming messages from the topic and loading them into the Data Source.

Configure the connector using .datasource files

If you are managing your Tinybird resources in files, there are several settings available to configure the Confluent Connector in .datasource files. See the datafiles docs for more information.

The following is an example of Kafka .datasource file for an already existing connection:

Example data source for Confluent Connector
SCHEMA >
  `__value` String,
  `__topic` LowCardinality(String),
  `__partition` Int16,
  `__offset` Int64,
  `__timestamp` DateTime,
  `__key` String
  `__headers` Map(String,String)

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

# Connection is already available. If you
# need to create one, add the required fields
# on an include file with the details.
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
KAFKA_STORE_HEADERS true

Columns of the Data Source

When you connect a Kafka producer to Tinybird, Tinybird consumes optional metadata columns from that Kafka record and writes them to the Data Source.

The following fields represent the raw data received from Kafka:

  • __value: A String representing the entire unparsed Kafka record inserted.
  • __topic: The Kafka topic that the message belongs to.
  • __partition: The kafka partition that the message belongs to.
  • __offset: The Kafka offset of the message.
  • __timestamp: The timestamp stored in the Kafka message received by Tinybird.
  • __key: The key of the kafka message.
  • __headers: Headers parsed from the incoming topic messages. See Using custom Kafka headers for advanced message processing.

Metadata fields are optional. Omit the fields you don't need to reduce your data storage.

Use INCLUDE to store connection settings

To avoid configuring the same connection settings across many files, or to prevent leaking sensitive information, you can store connection details in an external file and use INCLUDE to import them into one or more .datasource files.

You can find more information about INCLUDE in the Advanced Templates documentation.

For example, you might have two Confluent Cloud .datasource files, which re-use the same Confluent Cloud connection. You can create an include file which stores the Confluent Cloud connection details.

The Tinybird project might use the following structure:

Tinybird data project file structure
ecommerce_data_project/
    datasources/
        connections/
          my_connector_name.incl
        my_confluent_datasource.datasource
        another_datasource.datasource
    endpoints/
    pipes/

Where the file my_connector_name.incl has the following content:

Include file containing Confluent Cloud connection details
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password

And the Confluent Cloud .datasource files look like the following:

Data Source using includes for Confluent Cloud connection details
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/my_connection_name.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

When using tb pull to pull a Confluent Cloud Data Source using the CLI, the KAFKA_KEY and KAFKA_SECRET settings aren't included in the file to avoid exposing credentials.

Internal fields

The __<field> fields stored in the Kafka datasource represent the raw data received from Kafka:

  • __value: A String representing the whole Kafka record inserted.
  • __topic: The Kafka topic that the message belongs to.
  • __partition: The kafka partition that the message belongs to.
  • __offset: The Kafka offset of the message.
  • __timestamp: The timestamp stored in the Kafka message received by Tinybird.
  • __key: The key of the kafka message.

Compressed messages

Tinybird can consume from Kafka topics where Kafka compression is enabled, as decompressing the message is a standard function of the Kafka Consumer.

However, if you compressed the message before passing it through the Kafka Producer, then Tinybird can't do post-Consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes message, it's ingested by Tinybird as bytes. If you produced a JSON message to a Kafka topic with the Kafka Producer setting compression.type=gzip, while it's stored in Kafka as compressed bytes, it's decoded on ingestion and arrive to Tinybird as JSON.

Confluent logs

You can find global logs in the datasources_ops_log Service Data Source. Filter by datasource_id to select the correct datasource, and set event_type to append-kafka.

To select all Kafka releated logs in the last day, run the following query:

SELECT *
FROM tinybird.datasources_ops_log
WHERE datasource_id = 't_1234'
  AND event_type = 'append-kafka'
  AND timestamp > now() - INTERVAL 1 day
ORDER BY timestamp DESC

If you can't find logs in datasources_ops_log, the kafka_ops_log Service Data Source contains more detailed logs. Filter by datasource_id to select the correct datasource, and use msg_type to select the desired log level (info, warning, or error).

SELECT *
FROM tinybird.kafka_ops_log
WHERE datasource_id = 't_1234'
  AND timestamp > now() - interval 1 day
  AND msg_type IN ['info', 'warning', 'error']
Updated