Datasource files (.datasource)¶
Datasource files describe your Data Sources. You can use .datasource files to define the schema, engine, and other settings of your Data Sources. See Data Sources.
Available instructions¶
The following instructions are available for .datasource files.
Declaration | Required | Description |
---|---|---|
SCHEMA <indented_schema_definition> | Yes | Defines a block for a Data Source schema. The block must be indented. |
DESCRIPTION <markdown_string> | No | Description of the Data Source. |
TOKEN <token_name> APPEND | No | Grants append access to a Data Source to the token named <token_name>. If the token isn't specified or <token_name> doesn't exist, it will be automatically created. |
TAGS <tag_names> | No | Comma-separated list of tags. Tags are used to organize your data project. |
ENGINE <engine_type> | No | Sets the engine for Data Source. Default value is MergeTree . |
ENGINE_SORTING_KEY <sql> | No | Sets the ORDER BY expression for the Data Source. If unset, it defaults to DateTime, numeric, or String columns, in that order. |
ENGINE_PARTITION_KEY <sql> | No | Sets the PARTITION expression for the Data Source. |
ENGINE_TTL <sql> | No | Sets the TTL expression for the Data Source. |
ENGINE_VER <column_name> | No | Column with the version of the object state. Required when using ENGINE ReplacingMergeTree . |
ENGINE_SIGN <column_name> | No | Column to compute the state. Required when using ENGINE CollapsingMergeTree or ENGINE VersionedCollapsingMergeTree . |
ENGINE_VERSION <column_name> | No | Column with the version of the object state. Required when ENGINE VersionedCollapsingMergeTree . |
ENGINE_SETTINGS <settings> | No | Comma-separated list of key-value pairs that describe engine settings for the Data Source. |
INDEXES <index definitions> | No | Defines one or more indexes for the Data Source. See Data Skipping Indexes for more information. |
SHARED_WITH <workspace_name> | No | Shares the Data Source with one or more Workspaces. Use in combination with --user_token with admin rights in the origin Workspace. |
The following example shows a typical .datasource file:
tinybird/datasources/example.datasource
# A comment TOKEN tracker APPEND DESCRIPTION > Analytics events **landing data source** TAGS stock, recommendations SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" ENGINE_SETTINGS "index_granularity=8192" INDEXES > INDEX idx1 action TYPE bloom_filter GRANULARITY 3 SHARED_WITH > analytics_production analytics_staging
SCHEMA¶
A SCHEMA
declaration is a newline, comma-separated list of columns definitions. For example:
Example SCHEMA declaration
SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload`
Each column in a SCHEMA
declaration is in the format <column_name> <data_type> <json_path> <default_value>
, where:
<column_name>
is the name of the column in the Data Source.<data_type>
is one of the supported Data Types.<json_path>
is optional and only required for NDJSON Data Sources. See JSONpaths.<default_value>
sets a default value to the column when it's null. A common use case is to set a default date to a column, likeupdated_at DateTime DEFAULT now()
.
To change or update JSONPaths or other default values in the schema, push a new version of the schema using tb push --force
or use the alter endpoint on the Data Sources API.
JSONPath expressions¶
SCHEMA
definitions support JSONPath expressions. For example:
Schema syntax with jsonpath
DESCRIPTION Generated from /Users/username/tmp/sample.ndjson SCHEMA > `d` DateTime `json:$.d`, `total` Int32 `json:$.total`, `from_novoa` Int16 `json:$.from_novoa`
See JSONPaths for more information.
ENGINE settings¶
ENGINE
declares the engine used for the Data Source. The default value is MergeTree
.
See Engines for more information.
Connectors¶
Connector settings are part of the .datasource content. You can use include files to reuse connection settings and credentials.
When working with connectors, it’s important to understand how tokens interact with .datasource files. If a token doesn’t exist or isn't explicitly specified in the .datasource, it will be automatically created. This ensures that connectors can establish a working connection by default.
However, once a token is created and associated with a connector, it's crucial to handle it with care. Avoid deleting the token or modifying its scopes, as this can break the connection and disrupt the import process. The token is a critical component for maintaining a stable connection and ensuring that data is imported correctly.
Kafka, Confluent, RedPanda¶
The Kafka, Confluent, and RedPanda connectors use the following settings:
Instruction | Required | Description |
---|---|---|
KAFKA_CONNECTION_NAME | Yes | The name of the configured Kafka connection in Tinybird. |
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated list of one or more Kafka brokers, including Port numbers. |
KAFKA_KEY | Yes | Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution. |
KAFKA_SECRET | Yes | Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution. |
KAFKA_TOPIC | Yes | Name of the Kafka topic to consume from. |
KAFKA_GROUP_ID | Yes | Consumer Group ID to use when consuming from Kafka. |
KAFKA_AUTO_OFFSET_RESET | No | Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest , earliest . Default: latest . |
KAFKA_STORE_HEADERS | No | Store Kafka headers as field __headers for later processing. Default value is 'False' . |
KAFKA_STORE_BINARY_HEADERS | No | Stores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String) . To access the header 'key' run: __headers['key'] . Default value is 'True' . This field only applies if KAFKA_STORE_HEADERS is set to True . |
KAFKA_STORE_RAW_VALUE | No | Stores the raw message in its entirety as an additional column. Supported values are 'True' , 'False' . Default: 'False' . |
KAFKA_SCHEMA_REGISTRY_URL | No | URL of the Kafka schema registry. |
KAFKA_TARGET_PARTITIONS | No | Target partitions to place the messages. |
KAFKA_KEY_AVRO_DESERIALIZATION | No | Key for decoding Avro messages. |
KAFKA_SSL_CA_PEM | No | CA certificate in PEM format for SSL connections. |
KAFKA_SASL_MECHANISM | No | SASL mechanism to use for authentication. Supported values are 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' . Default values is 'PLAIN' . |
The following example defines a Data Source with a new Kafka, Confluent, or RedPanda connection in a .datasource file:
Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
The following example defines a Data Source that uses an existing Kafka, Confluent, or RedPanda connection:
Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Refer to the Kafka Connector, Amazon MSK, Confluent Connector, or RedPanda Connector documentation for more details.
BigQuery and Snowflake¶
The BigQuery and Snowflake connectors use the following settings:
Instruction | Required | Description |
---|---|---|
IMPORT_SERVICE | Yes | Name of the import service to use. Use bigquery or snowflake . |
IMPORT_SCHEDULE | Yes | Cron expression, in UTC time, with the frequency to run imports. Must be higher than 5 minutes. For example, */5 * * * * . Use @auto to sync once per minute when using s3 , or @on-demand to only run manually. |
IMPORT_CONNECTION_NAME | Yes | Name given to the connection inside Tinybird. For example, 'my_connection' . |
IMPORT_STRATEGY | Yes | Strategy to use when inserting data. Use REPLACE for BigQuery and Snowflake. |
IMPORT_EXTERNAL_DATASOURCE | No | Fully qualified name of the source table in BigQuery and Snowflake. For example, project.dataset.table . |
IMPORT_QUERY | No | The SELECT query to retrieve your data from BigQuery or Snowflake when you don't need all the columns or want to make a transformation before ingest. The FROM clause must reference a table using the full scope. For example, project.dataset.table . |
See BigQuery Connector or Snowflake Connector for more details.
BigQuery example¶
The following example shows a BigQuery Data Source described in a .datasource file:
Data Source with a BigQuery connection
DESCRIPTION > bigquery demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE bigquery IMPORT_SCHEDULE */5 * * * * IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
Snowflake example¶
The following example shows a Snowflake Data Source described in a .datasource file:
tinybird/datasources/snowflake.datasource - Data Source with a Snowflake connection
DESCRIPTION > Snowflake demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE snowflake IMPORT_CONNECTION_NAME my_snowflake_connection IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_SCHEDULE */5 * * * * IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
S3¶
The S3 connector uses the following settings:
Instruction | Required | Description |
---|---|---|
IMPORT_SERVICE | Yes | Name of the import service to use. Use s3 for S3 connections. |
IMPORT_CONNECTION_NAME | Yes | Name given to the connection inside Tinybird. For example, 'my_connection' . |
IMPORT_STRATEGY | Yes | Strategy to use when inserting data. Use APPEND for S3 connections. |
IMPORT_BUCKET_URI | Yes | Full bucket path, including the s3:// protocol, bucket name, object path, and an optional pattern to match against object keys. For example, s3://my-bucket/my-path discovers all files in the bucket my-bucket under the prefix /my-path . You can use patterns in the path to filter objects, for example, ending the path with *.csv matches all objects that end with the .csv suffix. |
IMPORT_FROM_DATETIME | No | Sets the date and time from which to start ingesting files on an S3 bucket. The format is YYYY-MM-DDTHH:MM:SSZ . |
See S3 Connector for more details.
S3 example¶
The following example shows an S3 Data Source described in a .datasource file:
tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION > Analytics events landing data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE s3 IMPORT_CONNECTION_NAME connection_name IMPORT_BUCKET_URI s3://my-bucket/*.csv IMPORT_SCHEDULE @auto IMPORT_STRATEGY APPEND