Datasource files (.datasource)

Datasource files describe your Data Sources. You can use .datasource files to define the schema, engine, and other settings of your Data Sources. See Data Sources.

Available instructions

The following instructions are available for .datasource files.

DeclarationRequiredDescription
SCHEMA <indented_schema_definition>YesDefines a block for a Data Source schema. The block must be indented.
DESCRIPTION <markdown_string>NoDescription of the Data Source.
TOKEN <token_name> APPENDNoGrants append access to a Data Source to the token named <token_name>. If the token isn't specified or <token_name> doesn't exist, it will be automatically created.
TAGS <tag_names>NoComma-separated list of tags. Tags are used to organize your data project.
ENGINE <engine_type>NoSets the engine for Data Source. Default value is MergeTree.
ENGINE_SORTING_KEY <sql>NoSets the ORDER BY expression for the Data Source. If unset, it defaults to DateTime, numeric, or String columns, in that order.
ENGINE_PARTITION_KEY <sql>NoSets the PARTITION expression for the Data Source.
ENGINE_TTL <sql>NoSets the TTL expression for the Data Source.
ENGINE_VER <column_name>NoColumn with the version of the object state. Required when using ENGINE ReplacingMergeTree.
ENGINE_SIGN <column_name>NoColumn to compute the state. Required when using ENGINE CollapsingMergeTree or ENGINE VersionedCollapsingMergeTree.
ENGINE_VERSION <column_name>NoColumn with the version of the object state. Required when ENGINE VersionedCollapsingMergeTree.
ENGINE_SETTINGS <settings>NoComma-separated list of key-value pairs that describe engine settings for the Data Source.
INDEXES <index definitions>NoDefines one or more indexes for the Data Source. See Data Skipping Indexes for more information.
SHARED_WITH <workspace_name>NoShares the Data Source with one or more Workspaces. Use in combination with --user_token with admin rights in the origin Workspace.

The following example shows a typical .datasource file:

tinybird/datasources/example.datasource
# A comment
TOKEN tracker APPEND

DESCRIPTION >
    Analytics events **landing data source**

TAGS stock, recommendations

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"
ENGINE_SETTINGS "index_granularity=8192"

INDEXES >
    INDEX idx1 action TYPE bloom_filter GRANULARITY 3

SHARED_WITH >
    analytics_production
    analytics_staging

SCHEMA

A SCHEMA declaration is a newline, comma-separated list of columns definitions. For example:

Example SCHEMA declaration
SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

Each column in a SCHEMA declaration is in the format <column_name> <data_type> <json_path> <default_value>, where:

  • <column_name> is the name of the column in the Data Source.
  • <data_type> is one of the supported Data Types.
  • <json_path> is optional and only required for NDJSON Data Sources. See JSONpaths.
  • <default_value> sets a default value to the column when it's null. A common use case is to set a default date to a column, like updated_at DateTime DEFAULT now().

To change or update JSONPaths or other default values in the schema, push a new version of the schema using tb push --force or use the alter endpoint on the Data Sources API.

JSONPath expressions

SCHEMA definitions support JSONPath expressions. For example:

Schema syntax with jsonpath
DESCRIPTION Generated from /Users/username/tmp/sample.ndjson

SCHEMA >
    `d` DateTime `json:$.d`,
    `total` Int32 `json:$.total`,
    `from_novoa` Int16 `json:$.from_novoa`

See JSONPaths for more information.

ENGINE settings

ENGINE declares the engine used for the Data Source. The default value is MergeTree.

See Engines for more information.

Connectors

Connector settings are part of the .datasource content. You can use include files to reuse connection settings and credentials.

When working with connectors, it’s important to understand how tokens interact with .datasource files. If a token doesn’t exist or isn't explicitly specified in the .datasource, it will be automatically created. This ensures that connectors can establish a working connection by default.

However, once a token is created and associated with a connector, it's crucial to handle it with care. Avoid deleting the token or modifying its scopes, as this can break the connection and disrupt the import process. The token is a critical component for maintaining a stable connection and ensuring that data is imported correctly.

Kafka, Confluent, RedPanda

The Kafka, Confluent, and RedPanda connectors use the following settings:

InstructionRequiredDescription
KAFKA_CONNECTION_NAMEYesThe name of the configured Kafka connection in Tinybird.
KAFKA_BOOTSTRAP_SERVERSYesComma-separated list of one or more Kafka brokers, including Port numbers.
KAFKA_KEYYesKey used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution.
KAFKA_SECRETYesSecret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution.
KAFKA_TOPICYesName of the Kafka topic to consume from.
KAFKA_GROUP_IDYesConsumer Group ID to use when consuming from Kafka.
KAFKA_AUTO_OFFSET_RESETNoOffset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest, earliest. Default: latest.
KAFKA_STORE_HEADERSNoStore Kafka headers as field __headers for later processing. Default value is 'False'.
KAFKA_STORE_BINARY_HEADERSNoStores all Kafka headers as binary data in field __headers as a binary map of type Map(String, String). To access the header 'key' run: __headers['key']. Default value is 'True'. This field only applies if KAFKA_STORE_HEADERS is set to True.
KAFKA_STORE_RAW_VALUENoStores the raw message in its entirety as an additional column. Supported values are 'True', 'False'. Default: 'False'.
KAFKA_SCHEMA_REGISTRY_URLNoURL of the Kafka schema registry.
KAFKA_TARGET_PARTITIONSNoTarget partitions to place the messages.
KAFKA_KEY_AVRO_DESERIALIZATIONNoKey for decoding Avro messages.
KAFKA_SSL_CA_PEMNoCA certificate in PEM format for SSL connections.
KAFKA_SASL_MECHANISMNoSASL mechanism to use for authentication. Supported values are 'PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'. Default values is 'PLAIN'.

The following example defines a Data Source with a new Kafka, Confluent, or RedPanda connection in a .datasource file:

Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

The following example defines a Data Source that uses an existing Kafka, Confluent, or RedPanda connection:

Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Refer to the Kafka Connector, Amazon MSK, Confluent Connector, or RedPanda Connector documentation for more details.

BigQuery and Snowflake

The BigQuery and Snowflake connectors use the following settings:

InstructionRequiredDescription
IMPORT_SERVICEYesName of the import service to use. Use bigquery or snowflake.
IMPORT_SCHEDULEYesCron expression, in UTC time, with the frequency to run imports. Must be higher than 5 minutes. For example, */5 * * * *. Use @auto to sync once per minute when using s3, or @on-demand to only run manually.
IMPORT_CONNECTION_NAMEYesName given to the connection inside Tinybird. For example, 'my_connection'.
IMPORT_STRATEGYYesStrategy to use when inserting data. Use REPLACE for BigQuery and Snowflake.
IMPORT_EXTERNAL_DATASOURCENoFully qualified name of the source table in BigQuery and Snowflake. For example, project.dataset.table.
IMPORT_QUERYNoThe SELECT query to retrieve your data from BigQuery or Snowflake when you don't need all the columns or want to make a transformation before ingest. The FROM clause must reference a table using the full scope. For example, project.dataset.table.

See BigQuery Connector or Snowflake Connector for more details.

BigQuery example

The following example shows a BigQuery Data Source described in a .datasource file:

Data Source with a BigQuery connection
DESCRIPTION >
    bigquery demo data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `id` Integer `json:$.id`,
    `orderid` LowCardinality(String) `json:$.orderid`,
    `status` LowCardinality(String) `json:$.status`,
    `amount` Integer `json:$.amount`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE bigquery
IMPORT_SCHEDULE */5 * * * *
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
    select
    timestamp,
    id,
    orderid,
    status,
    amount
        from
        mydb.raw.events

Snowflake example

The following example shows a Snowflake Data Source described in a .datasource file:

tinybird/datasources/snowflake.datasource - Data Source with a Snowflake connection
DESCRIPTION >
    Snowflake demo data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `id` Integer `json:$.id`,
    `orderid` LowCardinality(String) `json:$.orderid`,
    `status` LowCardinality(String) `json:$.status`,
    `amount` Integer `json:$.amount`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE snowflake
IMPORT_CONNECTION_NAME my_snowflake_connection
IMPORT_EXTERNAL_DATASOURCE mydb.raw.events
IMPORT_SCHEDULE */5 * * * *
IMPORT_STRATEGY REPLACE
IMPORT_QUERY >
    select
    timestamp,
    id,
    orderid,
    status,
    amount
        from
        mydb.raw.events

S3

The S3 connector uses the following settings:

InstructionRequiredDescription
IMPORT_SERVICEYesName of the import service to use. Use s3 for S3 connections.
IMPORT_CONNECTION_NAMEYesName given to the connection inside Tinybird. For example, 'my_connection'.
IMPORT_STRATEGYYesStrategy to use when inserting data. Use APPEND for S3 connections.
IMPORT_BUCKET_URIYesFull bucket path, including the s3:// protocol, bucket name, object path, and an optional pattern to match against object keys. For example, s3://my-bucket/my-path discovers all files in the bucket my-bucket under the prefix /my-path. You can use patterns in the path to filter objects, for example, ending the path with *.csv matches all objects that end with the .csv suffix.
IMPORT_FROM_DATETIMENoSets the date and time from which to start ingesting files on an S3 bucket. The format is YYYY-MM-DDTHH:MM:SSZ.

See S3 Connector for more details.

S3 example

The following example shows an S3 Data Source described in a .datasource file:

tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION >
    Analytics events landing data source

SCHEMA >
    `timestamp` DateTime `json:$.timestamp`,
    `session_id` String `json:$.session_id`,
    `action` LowCardinality(String) `json:$.action`,
    `version` LowCardinality(String) `json:$.version`,
    `payload` String `json:$.payload`

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
ENGINE_TTL "timestamp + toIntervalDay(60)"

IMPORT_SERVICE s3
IMPORT_CONNECTION_NAME connection_name
IMPORT_BUCKET_URI s3://my-bucket/*.csv
IMPORT_SCHEDULE @auto
IMPORT_STRATEGY APPEND
Updated