Kafka connector

You can set up a Kafka connector to stream data from your Kafka topic to Tinybird. The connector is compatible with Kafka, Confluent, and RedPanda.

Setting up the Kafka connector requires creating and enabling a data source and its connection as separate files.

Prerequisites

Grant READ permissions to both the topic and the consumer group to ingest data from Kafka into Tinybird.

You must secure your Kafka brokers with SSL/TLS and SASL. Tinybird uses SASL_SSL as the security protocol for the Kafka consumer. Connections are rejected if the brokers only support PLAINTEXT or SASL_PLAINTEXT.

Kafka Schema Registry is supported only for decoding Avro messages.

Set up the connector

To set up the Kafka connector, follow these steps.

1

Create a Kafka connection

Before you create and configure a Kafka data source, you need to set up a connection. Create a .connection file with the required credentials stored in secrets. For example:

kafkasample.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_SERVERS", "localhost:9092") }}
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_USERNAME", "") }}
KAFKA_SECRET {{ tb_secret("KAFKA_PASSWORD", "") }}

See Connection files for more details on how to create a connection file.

For a list of Kafka connection settings, see Settings.

2

Create a Kafka data source

Create a .datasource file using tb create --prompt or manually.

The .datasource file must contain the desired schema and the required settings for Kafka, including the KAFKA_CONNECTION_NAME setting, which must match the name of the .connection file you created in the previous step.

For example:

kafkasample.datasource
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME kafkasample # The name of the .connection file
KAFKA_TOPIC test_topic
KAFKA_GROUP_ID tinybird_consumer_1
3

Test and deploy

After you've defined your Kafka data source and connection, you can test and deploy the changes as usual. See Test and deploy.

To check if the connection is active, run tb connection ls.

.datasource settings

The Kafka connector use the following settings in .datasource files:

InstructionRequiredDescription
KAFKA_CONNECTION_NAMEYesThe name of the configured Kafka connection in Tinybird.
KAFKA_TOPICYesName of the Kafka topic to consume from.
KAFKA_GROUP_IDYesConsumer Group ID to use when consuming from Kafka.
KAFKA_AUTO_OFFSET_RESETNoOffset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest, earliest. Default: latest.
KAFKA_STORE_HEADERSNoStore Kafka headers as field __headers for later processing. Default value is 'False'.
KAFKA_STORE_BINARY_HEADERSNoStores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String). To access the header 'key' run: __headers['key']. Default value is 'True'. This field only applies if KAFKA_STORE_HEADERS is set to True.
KAFKA_STORE_RAW_VALUENoStores the raw message in its entirety as an additional column. Supported values are 'True', 'False'. Default: 'False'.
KAFKA_SCHEMA_REGISTRY_URLNoURL of the Kafka schema registry.
KAFKA_TARGET_PARTITIONSNoTarget partitions to place the messages.
KAFKA_KEY_FORMATNoFormat of the message value. Valid values are avro, json_with_schema, and json_without_schema. Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set.
KAFKA_VALUE_FORMATNoFormat of the message value. Valid values are avro, json_with_schema, and json_without_schema. Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set.
KAFKA_KEY_AVRO_DESERIALIZATIONNoIf the key of the message is serialized in avro format, allow decoding it using the Avro definition stored in KAFKA_SCHEMA_REGISTRY_URL. The key is converted to JSON and stored in the __key column. Defaults to 'False'. A deprecated option is to use the KAFKA_KEY_FORMAT parameter with the avro value.

.connection settings

The Kafka connector use the following settings in .connection files:

InstructionRequiredDescription
KAFKA_BOOTSTRAP_SERVERSYesComma-separated list of one or more Kafka brokers, including Port numbers.
KAFKA_KEYYesKey used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution.
KAFKA_SECRETYesSecret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution.
KAFKA_SECURITY_PROTOCOLNoSecurity protocol for the connection. Accepted values are plaintext and SASL_SSL. Default value is SASL_SSL.
KAFKA_SASL_MECHANISMNoSASL mechanism to use for authentication. Supported values are 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'. Default value is 'PLAIN'.

Columns of the data source

When you connect a Kafka producer to Tinybird, Tinybird consumes optional metadata columns from that Kafka record and writes them to the data source.

The following fields represent the raw data received from Kafka:

  • __value: A String representing the entire unparsed Kafka record inserted.
  • __topic: The Kafka topic that the message belongs to.
  • __partition: The kafka partition that the message belongs to.
  • __offset: The Kafka offset of the message.
  • __timestamp: The timestamp stored in the Kafka message received by Tinybird.
  • __key: The key of the kafka message.
  • __headers: Headers parsed from the incoming topic messages. See Using custom Kafka headers for advanced message processing.

Metadata fields are optional. Omit the fields you don't need to reduce your data storage.

Kafka logs

You can find global logs in the datasources_ops_log Service Data Source. Filter by datasource_id to select the correct datasource, and set event_type to append-kafka.

To select all Kafka releated logs in the last day, run the following query:

SELECT *
FROM tinybird.datasources_ops_log
WHERE datasource_id = 't_1234'
  AND event_type = 'append-kafka'
  AND timestamp > now() - INTERVAL 1 day
ORDER BY timestamp DESC

If you can't find logs in datasources_ops_log, the kafka_ops_log Service Data Source contains more detailed logs. Filter by datasource_id to select the correct datasource, and use msg_type to select the desired log level (info, warning, or error).

SELECT *
FROM tinybird.kafka_ops_log
WHERE datasource_id = 't_1234'
  AND timestamp > now() - interval 1 day
  AND msg_type IN ['info', 'warning', 'error']

Limits

The limits for the Kafka connector are:

  • Minimum flush time: 4 seconds
  • Throughput (uncompressed): 20MB/s
  • Up to 3 connections per Workspace

If you're regularly hitting these limits, contact support@tinybird.co for support.

Troubleshooting

When Kafka commits a message for a topic and a group ID, it always sends data from the latest committed offset. In Tinybird, each Kafka data source receives data from a topic and uses a group ID. The combination of topic and group id must be unique.

If you remove a Kafka data source and you recreate it again with the same settings after having received data, you only get data from the latest committed offset, even if KAFKA_AUTO_OFFSET_RESET is set to earliest.

To solve this issue, try the following:

  • Always use a different group ID when testing Kafka data sources.
  • Check in the tinybird.datasources_ops_log Service Data Source to see global errors.
  • Check in the tinybird.kafka_ops_log Service Data Source to see if you've already used a group id to ingest data from a topic.

Compressed messages

Tinybird can consume from Kafka topics where Kafka compression is enabled, as decompressing the message is a standard function of the Kafka consumer. If you compressed the message before passing it through the Kafka producer, Tinybird can't do post-consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes message, it would be ingested by Tinybird as bytes. If you produced a JSON message to a Kafka topic with the Kafka producer setting compression.type=gzip, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.

Updated