Datasource files (.datasource)¶
Datasource files describe your Data Sources. You can use .datasource files to define the schema, engine, and other settings of your Data Sources. See Data Sources.
Available instructions¶
The following instructions are available for .datasource files.
Declaration | Required | Description |
---|---|---|
SCHEMA <indented_schema_definition> | Yes | Defines a block for a Data Source schema. The block must be indented. |
DESCRIPTION <markdown_string> | No | Description of the Data Source. |
TOKEN <token_name> APPEND | No | Grants append access to a Data Source to the Token with name <token_name> . If the token doesn't exist, it's automatically created. |
TAGS <tag_names> | No | Comma-separated list of tags. Tags are used to organize your data project. |
ENGINE <engine_type> | No | Sets the ClickHouse® Engine for Data Source. Default value is MergeTree . |
ENGINE_SORTING_KEY <sql> | No | Sets the ORDER BY expression for the Data Source. If unset, it defaults to DateTime, numeric, or String columns, in that order. |
ENGINE_PARTITION_KEY <sql> | No | Sets the PARTITION expression for the Data Source. |
ENGINE_TTL <sql> | No | Sets the TTL expression for the Data Source. |
ENGINE_VER <column_name> | No | Column with the version of the object state. Required when using ENGINE ReplacingMergeTree . |
ENGINE_SIGN <column_name> | No | Column to compute the state. Required when using ENGINE CollapsingMergeTree or ENGINE VersionedCollapsingMergeTree . |
ENGINE_VERSION <column_name> | No | Column with the version of the object state. Required when ENGINE VersionedCollapsingMergeTree . |
ENGINE_SETTINGS <settings> | No | Comma-separated list of key-value pairs that describe ClickHouse® engine settings for the Data Source. |
SHARED_WITH <workspace_name> | No | Shares the Data Source with one or more Workspaces. Use in combination with --user_token with admin rights in the origin Workspace. |
The following example shows a typical .datasource file:
tinybird/datasources/example.datasource
# A comment TOKEN tracker APPEND DESCRIPTION > Analytics events **landing data source** TAGS stock, recommendations SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" ENGINE_SETTINGS "index_granularity=8192" SHARED_WITH > analytics_production analytics_staging
SCHEMA¶
A SCHEMA
declaration is a newline, comma-separated list of columns definitions. For example:
Example SCHEMA declaration
SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload`
Each column in a SCHEMA
declaration is in the format <column_name> <data_type> <json_path> <default_value>
, where:
<column_name>
is the name of the column in the Data Source.<data_type>
is one of the supported Data Types.<json_path>
is optional and only required for NDJSON Data Sources. See JSONpaths.<default_value>
sets a default value to the column when it's null. A common use case is to set a default date to a column, likeupdated_at DateTime DEFAULT now()
.
To change or update JSONPaths or other default values in the schema, push a new version of the schema using tb push --force
or use the alter endpoint on the Data Sources API.
JSONPath expressions¶
SCHEMA
definitions support JSONPath expressions. For example:
Schema syntax with jsonpath
DESCRIPTION Generated from /Users/username/tmp/sample.ndjson SCHEMA > `d` DateTime `json:$.d`, `total` Int32 `json:$.total`, `from_novoa` Int16 `json:$.from_novoa`
See JSONPaths for more information.
ENGINE settings¶
ENGINE
declares the ClickHouse® engine used for the Data Source. The default value is MergeTree
.
The supported values for ENGINE
are the following:
MergeTree
ReplacingMergeTree
SummingMergeTree
AggregatingMergeTree
CollapsingMergeTree
VersionedCollapsingMergeTree
Null
Read Supported engine and settings for more information.
Connectors¶
Connectors settings are part of the .datasource content. You can use include files to reuse connection settings and credentials.
Kafka, Confluent, RedPanda¶
The Kafka, Confluent, and RedPanda connectors use the following settings:
Instruction | Required | Description |
---|---|---|
KAFKA_CONNECTION_NAME | Yes | The name of the configured Kafka connection in Tinybird. |
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated list of one or more Kafka brokers, including Port numbers. |
KAFKA_KEY | Yes | Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution. |
KAFKA_SECRET | Yes | Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution. |
KAFKA_TOPIC | Yes | Name of the Kafka topic to consume from. |
KAFKA_GROUP_ID | Yes | Consumer Group ID to use when consuming from Kafka. |
KAFKA_AUTO_OFFSET_RESET | No | Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest , earliest . Default: latest . |
KAFKA_STORE_HEADERS | No | Store Kafka headers as field __headers for later processing. Default value is 'False' . |
KAFKA_STORE_BINARY_HEADERS | No | Stores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String) . To access the header 'key' run: __headers['key'] . Default value is 'True' . This field only applies if KAFKA_STORE_HEADERS is set to True . |
KAFKA_STORE_RAW_VALUE | No | Stores the raw message in its entirety as an additional column. Supported values are 'True' , 'False' . Default: 'False' . |
KAFKA_SCHEMA_REGISTRY_URL | No | URL of the Kafka schema registry. |
KAFKA_TARGET_PARTITIONS | No | Target partitions to place the messages. |
KAFKA_KEY_AVRO_DESERIALIZATION | No | Key for decoding Avro messages. |
KAFKA_SSL_CA_PEM | No | CA certificate in PEM format for SSL connections. |
KAFKA_SASL_MECHANISM | No | SASL mechanism to use for authentication. Supported values are 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' . Default values is 'PLAIN' . |
The following example defines a Data Source with a new Kafka, Confluent, or RedPanda connection in a .datasource file:
Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
The following example defines a Data Source that uses an existing Kafka, Confluent, or RedPanda connection:
Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Refer to the Kafka Connector, Confluent Connector, or RedPanda Connector documentation for more details.
Scheduled Connectors¶
Scheduled connectors, such as the S3, BigQuery, and Snowflake connectors, use the following settings:
Instruction | Required | Description |
---|---|---|
IMPORT_SERVICE | Yes | Name of the import service to use. Valid values are bigquery , snowflake , s3 . |
IMPORT_SCHEDULE | Yes | Cron expression, in UTC time, with the frequency to run imports. Must be higher than 5 minutes. For example, */5 * * * * . Use @auto to sync once per minute when using s3 , or @on-demand to only run manually. |
IMPORT_CONNECTION_NAME | Yes | Name given to the connection inside Tinybird. For example, 'my_connection' . |
IMPORT_STRATEGY | Yes | Strategy to use when inserting data, either REPLACE for BigQuery and Snowflake or APPEND for s3. |
IMPORT_BUCKET_URI | Yes | Full bucket path when IMPORT_SERVICE is s3 , including the s3:// protocol, bucket name, object path, and an optional pattern to match against object keys. For example, s3://my-bucket/my-path would discover all files in the bucket my-bucket under the prefix /my-path . You can use patterns in the path to filter objects, for example, ending the path with *.csv matches all objects that end with the .csv suffix. |
IMPORT_EXTERNAL_DATASOURCE | No | Fully qualified name of the source table in BigQuery or Snowflake. For example, project.dataset.table . |
IMPORT_QUERY | No | The SELECT query to extract your data from BigQuery or Snowflake when you don't need all the columns or want to make a transformation before ingest. The FROM clause must reference a table using the full scope. For example, project.dataset.table . |
IMPORT_FROM_DATETIME | No | Set the date and time from which to start ingesting files on an S3 bucket. The format is YYYY-MM-DDTHH:MM:SSZ . |
Refer to the BigQuery, Snowflake Connector, or S3 Connector documentation for more details.
BigQuery example¶
The following example shows a BigQuery Data Source described in a .data source file:
Data Source with a BigQuery connection
DESCRIPTION > bigquery demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE bigquery IMPORT_SCHEDULE */5 * * * * IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
Snowflake example¶
The following example shows a Snowflake Data Source described in a .data source file:
tinybird/datasources/snowflake.datasource - Data Source with a Snowflake connection
DESCRIPTION > Snowflake demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE snowflake IMPORT_CONNECTION_NAME my_snowflake_connection IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_SCHEDULE */5 * * * * IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
S3 example¶
The following example shows an S3 Data Source described in a .data source file:
tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION > Analytics events landing data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE s3 IMPORT_CONNECTION_NAME connection_name IMPORT_BUCKET_URI s3://my-bucket/*.csv IMPORT_SCHEDULE @auto IMPORT_STRATEGY APPEND