Kafka connector¶
You can set up a Kafka connector to stream data from your Kafka topic to Tinybird. The connector is compatible with Kafka, Confluent, and RedPanda.
Setting up the Kafka connector requires creating and enabling a data source and its connection as separate files.
Prerequisites¶
Grant READ
permissions to both the topic and the consumer group to ingest data from Kafka into Tinybird.
You must secure your Kafka brokers with SSL/TLS and SASL. Tinybird uses SASL_SSL
as the security protocol for the Kafka consumer. Connections are rejected if the brokers only support PLAINTEXT
or SASL_PLAINTEXT
.
Kafka Schema Registry is supported only for decoding Avro messages.
Set up the connector¶
To set up the Kafka connector, follow these steps.
Create a Kafka connection¶
Before you create and configure a Kafka data source, you need to set up a connection. Create a .connection file with the required credentials stored in secrets. For example:
kafkasample.connection
TYPE kafka KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_SERVERS", "localhost:9092") }} KAFKA_SECURITY_PROTOCOL SASL_SSL KAFKA_SASL_MECHANISM PLAIN KAFKA_KEY {{ tb_secret("KAFKA_USERNAME", "") }} KAFKA_SECRET {{ tb_secret("KAFKA_PASSWORD", "") }}
See Connection files for more details on how to create a connection file.
For a list of Kafka connection settings, see Settings.
Create a Kafka data source¶
Create a .datasource file using tb create --prompt
or manually.
The .datasource file must contain the desired schema and the required settings for Kafka, including the KAFKA_CONNECTION_NAME
setting, which must match the name of the .connection file you created in the previous step.
For example:
kafkasample.datasource
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME kafkasample # The name of the .connection file KAFKA_TOPIC test_topic KAFKA_GROUP_ID tinybird_consumer_1
Test and deploy¶
After you've defined your Kafka data source and connection, you can test and deploy the changes as usual. See Test and deploy.
To check if the connection is active, run tb connection ls
.
.datasource settings¶
The Kafka connector use the following settings in .datasource files:
Instruction | Required | Description |
---|---|---|
KAFKA_CONNECTION_NAME | Yes | The name of the configured Kafka connection in Tinybird. |
KAFKA_TOPIC | Yes | Name of the Kafka topic to consume from. |
KAFKA_GROUP_ID | Yes | Consumer Group ID to use when consuming from Kafka. |
KAFKA_AUTO_OFFSET_RESET | No | Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest , earliest . Default: latest . |
KAFKA_STORE_HEADERS | No | Store Kafka headers as field __headers for later processing. Default value is 'False' . |
KAFKA_STORE_BINARY_HEADERS | No | Stores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String) . To access the header 'key' run: __headers['key'] . Default value is 'True' . This field only applies if KAFKA_STORE_HEADERS is set to True . |
KAFKA_STORE_RAW_VALUE | No | Stores the raw message in its entirety as an additional column. Supported values are 'True' , 'False' . Default: 'False' . |
KAFKA_SCHEMA_REGISTRY_URL | No | URL of the Kafka schema registry. |
KAFKA_TARGET_PARTITIONS | No | Target partitions to place the messages. |
KAFKA_KEY_FORMAT | No | Format of the message value. Valid values are avro , json_with_schema , and json_without_schema . Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set. |
KAFKA_VALUE_FORMAT | No | Format of the message value. Valid values are avro , json_with_schema , and json_without_schema . Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set. |
KAFKA_KEY_AVRO_DESERIALIZATION | No | If the key of the message is serialized in avro format, allow decoding it using the Avro definition stored in KAFKA_SCHEMA_REGISTRY_URL . The key is converted to JSON and stored in the __key column. Defaults to 'False' . A deprecated option is to use the KAFKA_KEY_FORMAT parameter with the avro value. |
.connection settings¶
The Kafka connector use the following settings in .connection files:
Instruction | Required | Description |
---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated list of one or more Kafka brokers, including Port numbers. |
KAFKA_KEY | Yes | Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution. |
KAFKA_SECRET | Yes | Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution. |
KAFKA_SECURITY_PROTOCOL | No | Security protocol for the connection. Accepted values are plaintext and SASL_SSL . Default value is SASL_SSL . |
KAFKA_SASL_MECHANISM | No | SASL mechanism to use for authentication. Supported values are 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' . Default value is 'PLAIN' . |
Columns of the data source¶
When you connect a Kafka producer to Tinybird, Tinybird consumes optional metadata columns from that Kafka record and writes them to the data source.
The following fields represent the raw data received from Kafka:
__value
: A String representing the entire unparsed Kafka record inserted.__topic
: The Kafka topic that the message belongs to.__partition
: The kafka partition that the message belongs to.__offset
: The Kafka offset of the message.__timestamp
: The timestamp stored in the Kafka message received by Tinybird.__key
: The key of the kafka message.__headers
: Headers parsed from the incoming topic messages. See Using custom Kafka headers for advanced message processing.
Metadata fields are optional. Omit the fields you don't need to reduce your data storage.
Kafka logs¶
You can find global logs in the datasources_ops_log
Service Data Source. Filter by datasource_id
to select the correct datasource, and set event_type
to append-kafka
.
To select all Kafka releated logs in the last day, run the following query:
SELECT * FROM tinybird.datasources_ops_log WHERE datasource_id = 't_1234' AND event_type = 'append-kafka' AND timestamp > now() - INTERVAL 1 day ORDER BY timestamp DESC
If you can't find logs in datasources_ops_log
, the kafka_ops_log
Service Data Source contains more detailed logs. Filter by datasource_id
to select the correct datasource, and use msg_type
to select the desired log level (info
, warning
, or error
).
SELECT * FROM tinybird.kafka_ops_log WHERE datasource_id = 't_1234' AND timestamp > now() - interval 1 day AND msg_type IN ['info', 'warning', 'error']
Limits¶
The limits for the Kafka connector are:
- Minimum flush time: 4 seconds
- Throughput (uncompressed): 20MB/s
- Up to 3 connections per Workspace
If you're regularly hitting these limits, contact support@tinybird.co for support.
Troubleshooting¶
When Kafka commits a message for a topic and a group ID, it always sends data from the latest committed offset. In Tinybird, each Kafka data source receives data from a topic and uses a group ID. The combination of topic
and group id
must be unique.
If you remove a Kafka data source and you recreate it again with the same settings after having received data, you only get data from the latest committed offset, even if KAFKA_AUTO_OFFSET_RESET
is set to earliest
.
To solve this issue, try the following:
- Always use a different group ID when testing Kafka data sources.
- Check in the
tinybird.datasources_ops_log
Service Data Source to see global errors. - Check in the
tinybird.kafka_ops_log
Service Data Source to see if you've already used a group id to ingest data from a topic.
Compressed messages¶
Tinybird can consume from Kafka topics where Kafka compression is enabled, as decompressing the message is a standard function of the Kafka consumer. If you compressed the message before passing it through the Kafka producer, Tinybird can't do post-consumer processing to decompress the message.
For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes
message, it would be ingested by Tinybird as bytes
. If you produced a JSON message to a Kafka topic with the Kafka producer setting compression.type=gzip
, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.