Datafiles¶
Datafiles are text files that describe your Tinybird resources: Data Sources, Pipes, etc.
You can use datafiles to manage your projects as source code and take advantage of version control.
Types of datafiles¶
- Data Source: uses the
.datasource
extension. Represents a Data Source and should follow the Data Source syntax. - Pipe: uses the
.pipe
extension. Represents a Pipe and should follow the Pipe syntax. - Include: uses the
.incl
extension. Includes are a reusable datafile fragment that can be included either in.datasource
or.pipe
files.
Basic syntax¶
Datafiles have a simple syntax:
Basic syntax
CMD value OTHER_CMD "value with multiple words"
or for multi-line:
Multiline syntax
CMD > multi line values are indented
An example using real-life details:
Schema syntax
DESCRIPTION generated from /Users/username/tmp/sample.csv SCHEMA > `d` DateTime, `total` Int32, `from_novoa` Int16
And including JSONpaths:
Schema syntax with jsonpath
DESCRIPTION generated from /Users/username/tmp/sample.ndjson SCHEMA > `d` DateTime `json:$.d`, `total` Int32 `json:$.total`, `from_novoa` Int16 `json:$.from_novoa`
Example file structure¶
This page shows examples where the tinybird
project folder includes the sub-folders datasources
, endpoints
, includes
, and pipes
:
Example file structure
tinybird ├── datasources/ │ └── connections/ │ └── my_connector_name.incl │ └── my_datasource.datasource ├── endpoints/ ├── includes/ ├── pipes/
Data Source¶
SCHEMA <schema_definition>
- (Required) It defines a block for a Data Source schema, only valid for .datasource files. The block has to be indented.DESCRIPTION <markdown_string>
- (Optional) Sets the description for Data Source.TOKEN <token_name> APPEND
- (Optional) Grants append access to a Data Source to the Token with name <token_name>. If it does not exist it'll be automatically created.TAGS <tag_names>
- (Optional) Assigns tags to the Data Source. Tags are used to organize your data project. Comma-separated list of tags.ENGINE <engine_type>
- (Optional) Sets the ClickHouse® Engine for Data Source. Default:MergeTree
.ENGINE_SORTING_KEY <sql>
- (Optional) Sets the ClickHouse ORDER BY expression for Data Source. If not set it will default to a DateTime, numeric or String columns in that order.ENGINE_PARTITION_KEY <sql>
- (Optional) Sets the ClickHouse PARTITION expression for Data Source.ENGINE_TTL <sql>
- (Optional) Sets the ClickHouse TTL expression for Data Source.ENGINE_VER <column_name>
- (Optional) Required whenENGINE ReplacingMergeTree
. The column with the version of the object state.ENGINE_SIGN <column_name>
- (Optional) Required whenENGINE CollapsingMergeTree
orENGINE VersionedCollapsingMergeTree
. The column to compute the state.ENGINE_VERSION <column_name>
- (Optional) Required whenENGINE VersionedCollapsingMergeTree
. The column with the version of the object state.ENGINE_SETTINGS <settings>
- (Optional) Sets the ClickHouse settings for Data Source. The settings are a comma-separated list of key-value pairs.SHARED_WITH <workspace_name>
- (Optional) Shares the Data Source with one or more Workspace. It needs to be used in combination with--user_token
with admin rights in the origin Workspace.
tinybird/datasources/example.datasource
TOKEN tracker APPEND DESCRIPTION > Analytics events **landing data source** TAGS stock, recommendations SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" ENGINE_SETTINGS "index_granularity=8192" SHARED_WITH > analytics_production analytics_staging
SCHEMA¶
A SCHEMA
definition is a new line comma separated list of columns definitions.
Each column is represented as `column_name` data_type json_path default_value
, where column_name
is the name of the column in the Data Source and data_type
is one of the supported Data Types.
json_path
is optional and only required for NDJSON Data Sources, read more about JSONpaths.
default_value
is used to set a default value to the column when it's null. A common use case is to set a default date to a column, like this: updated_at DateTime DEFAULT now()
.
Change default values¶
You don't have to completely remove and replace anything, or move any data, to perform a change on the JSONPaths and/or default values. Either push a new version of the schema using the CLI and tb push --force
(see the tb push docs for more details) or use the "alter" endpoint on the Data Sources API to update the schema.
ENGINE and settings¶
Engine can be one of: MergeTree
, ReplacingMergeTree
, SummingMergeTree
, AggregatingMergeTree
, CollapsingMergeTree
, VersionedCollapsingMergeTree
, Null
.
Read the supported engine and settings documentation for more info.
Connectors¶
You describe connectors as part of the .datasource
datafile. You can use Include
to better handle connection settings and credentials.
Kafka, Confluent, RedPanda¶
These connectors all use identical datafiles with the following options:
KAFKA_CONNECTION_NAME
: (Required) The name of the configured Kafka connection in Tinybird.KAFKA_BOOTSTRAP_SERVERS
: (Required) A comma-separated list of one or more Kafka brokers (including Port numbers).KAFKA_KEY
: (Required) The key used to authenticate with Kafka, sometimes called Key, Client Key, or Username, depending on the Kafka distribution.KAFKA_SECRET
: (Required) The secret used to authenticate with Kafka, sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution.KAFKA_TOPIC
: (Required) The name of the Kafka topic to consume from.KAFKA_GROUP_ID
: (Required) The Kafka Consumer Group ID to use when consuming from Kafka.KAFKA_AUTO_OFFSET_RESET
: (Optional) The offset to use when no previous offset can be found, e.g. when creating a new consumer. Supported values:latest
,earliest
. Default:latest
.KAFKA_STORE_HEADERS
: (Optional) Store Kafka headers as field__headers
for later processing. Default:'False'
.KAFKA_STORE_BINARY_HEADERS
: (Optional; this field only applies ifKAFKA_STORE_HEADERS
is True.) Store all Kafka headers as binary data in field__headers
as a binary map:Map(String, String)
. To later access header'key'
simply run:__headers['key']
. Default:'True'
.KAFKA_STORE_RAW_VALUE
: (Optional) Stores the raw message in its entirety as an additional column. Supported values:'True'
,'False'
. Default:'False'
.KAFKA_SCHEMA_REGISTRY_URL
: (Optional) URL of the Kafka schema registry.KAFKA_TARGET_PARTITIONS
: (Optional)KAFKA_KEY_AVRO_DESERIALIZATION
: (Optional)
For example, to define Data Source with a new Kafka/Confluent/RedPanda connection in a .datasource
file:
tinybird/datasources/kafka.datasource - Data Source with a new Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Or, to define Data Source that uses an existing Kafka/Confluent/RedPanda connection:
tinybird/datasources/kafka.datasource - Data Source with an existing Kafka/Confluent/RedPanda connection
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME my_connection_name KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Refer to the specific Connector docs (Kafka, Confluent, RedPanda) for more details on to configure them.
Scheduled Connectors¶
IMPORT_SERVICE
: (Required) name of the import service to use, valid values:bigquery
,snowflake
,s3
.IMPORT_SCHEDULE
: (Required) a cron expression (UTC) with the frequency to run imports, must be higher than 5 minutes, e.g.*/5 * * * *
. Use@auto
to sync once per minute (only available fors3
), or@on-demand
to only execute manually.IMPORT_CONNECTION_NAME
: (Required) the name given to the connection inside Tinybird, e.g.'my_connection'
.IMPORT_STRATEGY
: (Required) the strategy to use when inserting data, eitherREPLACE
(for BigQuery and Snowflake) orAPPEND
(for s3).IMPORT_BUCKET_URI
: (Required) whenIMPORT_SERVICE s3
. A full bucket path, including thes3://
protocol , bucket name, object path and an optional pattern to match against object keys. For example,s3://my-bucket/my-path
would discover all files in the bucketmy-bucket
under the prefix/my-path
. You can use patterns in the path to filter objects, for example, ending the path with*.csv
will match all objects that end with the.csv
suffix.IMPORT_EXTERNAL_DATASOURCE
: (Optional) the fully qualified name of the source table in BigQuery or Snowflake e.g.project.dataset.table
.IMPORT_QUERY
: (Optional) the SELECT query to extract your data from BigQuery or Snowflake when you don't need all the columns or want to make a transformation before ingestion. The FROM must reference a table using the full scope:project.dataset.table
.IMPORT_FROM_DATETIME
: (Optional) set the date and time from which to start ingesting files on an S3 bucket. The format isYYYY-MM-DDTHH:MM:SSZ
.
Refer to the BigQuery, Snowflake and S3 connectors reference for more details about how to configure them.
BigQuery¶
tinybird/datasources/bigquery.datasource - Data Source with a BigQuery connection
DESCRIPTION > bigquery demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE bigquery IMPORT_SCHEDULE */5 * * * * IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
Snowflake¶
tinybird/datasources/snowflake.datasource - Data Source with a Snowflake connection
DESCRIPTION > Snowflake demo data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `id` Integer `json:$.id`, `orderid` LowCardinality(String) `json:$.orderid`, `status` LowCardinality(String) `json:$.status`, `amount` Integer `json:$.amount` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE snowflake IMPORT_CONNECTION_NAME my_snowflake_connection IMPORT_EXTERNAL_DATASOURCE mydb.raw.events IMPORT_SCHEDULE */5 * * * * IMPORT_STRATEGY REPLACE IMPORT_QUERY > select timestamp, id, orderid, status, amount from mydb.raw.events
S3¶
tinybird/datasources/s3.datasource - Data Source with an S3 connection
DESCRIPTION > Analytics events landing data source SCHEMA > `timestamp` DateTime `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" ENGINE_TTL "timestamp + toIntervalDay(60)" IMPORT_SERVICE s3 IMPORT_CONNECTION_NAME connection_name IMPORT_BUCKET_URI s3://my-bucket/*.csv IMPORT_SCHEDULE @auto IMPORT_STRATEGY APPEND
Pipe¶
%
: (Optional) Use as the first character of a Node to indicate the Node uses the templating system.DESCRIPTION <markdown_string>
: (Optional) Sets the description for a Node or the complete file.TAGS <tag_names>
- (Optional) Assigns tags to the Pipe. Tags are used to organize your data project. Comma-separated list of tags.NODE <node_name>
: (Required) Starts the definition of a new Node, all the commands until a new NODE command or the end of the file will be related to the this Node.SQL <sql>
: (Required) It defines a block for the SQL of a Node. The block has to be indented (refer to the examples above).INCLUDE <include_path.incl> <variables>
: (Optional) Includes are pieces of a Pipe that can be reused in multiple Pipe datafiles.TYPE <pipe_type>
: (Optional) Sets the type of the Node. Can be set to 'ENDPOINT', 'MATERIALIZED', 'COPY', OR 'SINK'.DATASOURCE <data_source_name>
: Required whenTYPE MATERIALIZED
. Sets the destination Data Source for materialized Nodes.TARGET_DATASOURCE <data_source_name>
: Required whenTYPE COPY
. Sets the destination Data Source for copy Nodes.TOKEN <token_name> READ
: (Optional) Grants read access to a Pipe/Endpoint to the Token with name <token_name>. If it does not exist it'll be automatically created.COPY_SCHEDULE
: (Optional) A cron expression with the frequency to run copy jobs, must be higher than 5 minutes, e.g.*/5 * * * *
. If not defined, it would default to @on-demand.COPY_MODE
: (Optional) The strategy to ingest data for copy jobs. One ofappend
orreplace
, if empty the default strategy isappend
.
Materialized Pipe¶
Use it to define how to materialize each row ingested in the left most Data Source in the Pipe query to a materialized Data Source. Materialization happens on ingestion. More about Materialized Views.
tinybird/pipes/sales_by_hour_mv.pipe
DESCRIPTION materialized Pipe to aggregate sales per hour in the sales_by_hour Data Source NODE daily_sales SQL > SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales FROM teams GROUP BY day, country TYPE MATERIALIZED DATASOURCE sales_by_hour
Copy Pipe¶
Use it to define how to export the result of the Pipe to a Data Source, optionally with a schedule. More about Copy Pipes.
tinybird/pipes/sales_by_hour_cp.pipe
DESCRIPTION Copy Pipe to export sales hour every hour to the sales_hour_copy Data Source NODE daily_sales SQL > % SELECT toStartOfDay(starting_date) day, country, sum(sales) as total_sales FROM teams WHERE day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) and country = {{ String(country, ‘US’)}} GROUP BY day, country TYPE COPY TARGET_DATASOURCE sales_hour_copy COPY_SCHEDULE 0 * * * *
API Endpoint Pipe¶
Use it to define how to export the result of the Pipe as an HTTP endpoint. More about API Endpoints
tinybird/pipes/sales_by_hour_endpoint.pipe
TOKEN dashboard READ DESCRIPTION endpoint to get sales by hour filtering by date and country TAGS sales NODE daily_sales SQL > % SELECT day, country, sum(total_sales) as total_sales FROM sales_by_hour WHERE day BETWEEN toStartOfDay(now()) - interval 1 day AND toStartOfDay(now()) and country = {{ String(country, ‘US’)}} GROUP BY day, country NODE result SQL > % SELECT * FROM daily_sales LIMIT {{Int32(page_size, 100)}} OFFSET {{Int32(page, 0) * Int32(page_size, 100)}}
Sink Pipe¶
Sink Pipe parameters
EXPORT_SERVICE
: (Required) One ofgcs_hmac
,s3
,s3_iamrole
, orkafka
.EXPORT_CONNECTION_NAME
: (Required).EXPORT_SCHEDULE
: (Optional) A crontab expression same type of expression as IMPORT_SCHEDULE here.
Blob Storage Sink¶
When setting EXPORT_SERVICE
as one of gcs_hmac
, s3
, or s3_iamrole
EXPORT_BUCKET_URI
: (Required) The desired bucket path for the exported file. The path should not include the filename and extension.EXPORT_FILE_TEMPLATE
: (Required) A template string that specifies the naming convention for exported files. The template can include dynamic attributes between curly braces based on columns' data that will be replaced with actual values when exporting. Example:export_{category}{date,'%Y'}{2}
.EXPORT_FORMAT
: (Required) The format in which the data should be exported. Supported output formats are listed in the ClickHouse docs. The default is csv.EXPORT_COMPRESSION
: (Optional) The compression file type in which the file will be created as. Accepted values:none
,gz
for gzip,br
for brotli,xz
for LZMA,zst
for zstd. Default:none
.EXPORT_STRATEGY
: (Required) One of the available strategies. The default is@new
.
Kafka Sink¶
Kafka Sinks are currently in private beta. If you have any feedback or suggestions, contact Tinybird at support@tinybird.co or in the Community Slack.
When setting EXPORT_SERVICE
as kafka
EXPORT_KAFKA_TOPIC
: (Required) The desired topic for the export data.
Include¶
Use .incl
datafiles to separate connector settings and include them in your .datasource
files or reuse Pipe templates. See some examples below.
Include connector settings¶
Separate connector settings from .datasource
files.
tinybird/datasources/connections/kafka_connection.incl
KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS my_server:9092 KAFKA_KEY my_username KAFKA_SECRET my_password
tinybird/datasources/kafka_ds.datasource
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" INCLUDE "connections/kafka_connection.incl" KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Include Pipe Nodes¶
You can also use .incl
datafiles to reuse templates:
tinybird/includes/only_buy_events.incl
NODE only_buy_events SQL > SELECT toDate(timestamp) date, product, color, JSONExtractFloat(json, 'price') as price FROM events where action = 'buy'
tinybird/endpoints/sales.pipe
INCLUDE "../includes/only_buy_events.incl" NODE endpoint DESCRIPTION > return sales for a product with color filter SQL > % select date, sum(price) total_sales from only_buy_events where color in {{Array(colors, 'black')}} group by date
tinybird/pipes/top_per_day.pipe
INCLUDE "../includes/only_buy_events.incl" NODE top_per_day SQL > SELECT date, topKState(10)(product) top_10, sumState(price) total_sales from only_buy_events group by date TYPE MATERIALIZED DATASOURCE mv_top_per_day
Include with variables¶
You can also templatize .incl
datafiles. For instance you can reuse the same .incl
template but with different variable values:
tinybird/includes/top_products.incl
NODE endpoint DESCRIPTION > returns top 10 products for the last week SQL > % select date, topKMerge(10)(top_10) as top_10 from top_product_per_day {% if '$DATE_FILTER' == 'last_week' %} where date > today() - interval 7 day {% else %} where date between {{Date(start)}} and {{Date(end)}} {% end %} group by date
tinybird/endpoints/top_products_last_week.pipe
INCLUDE "../includes/top_products.incl" "DATE_FILTER=last_week"
tinybird/endpoints/top_products_between_dates.pipe
INCLUDE "../includes/top_products.incl" "DATE_FILTER=between_dates"
Note we are including $DATE_FILTER
as a variable in the .incl
datafile and then we create two separate endpoints injecting a value for the DATE_FILTER
variable making the final .pipe
datafile being different.
Include with Environment variables¶
INCLUDE
datafiles are expanded by the CLI, that means you can expand Environment variables as well.
Assuming you have configured the environment variables, KAFKA_BOOTSTRAP_SERVERS
, KAFKA_KEY
and KAFKA_SECRET
you can create a .incl
datafile like this:
tinybird/datasources/connections/kafka_connection.incl
KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS ${KAFKA_BOOTSTRAP_SERVERS} KAFKA_KEY ${KAFKA_KEY} KAFKA_SECRET ${KAFKA_SECRET}
And use these values in your .datasource
datafiles:
tinybird/datasources/kafka_ds.datasource
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" INCLUDE "connections/kafka_connection.incl" KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Alternatively you can create separate .incl
files per environment variable:
tinybird/datasources/connections/kafka_connection_prod.incl
KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS production_servers KAFKA_KEY the_kafka_key KAFKA_SECRET ${KAFKA_SECRET}
tinybird/datasources/connections/kafka_connection_stg.incl
KAFKA_CONNECTION_NAME my_connection_name KAFKA_BOOTSTRAP_SERVERS staging_servers KAFKA_KEY the_kafka_key KAFKA_SECRET ${KAFKA_SECRET}
And then include them depending on the environment
tinybird/datasources/kafka_ds.datasource
SCHEMA > `value` String, `topic` LowCardinality(String), `partition` Int16, `offset` Int64, `timestamp` DateTime, `key` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" INCLUDE "connections/kafka_connection_${TB_ENV}.incl" KAFKA_TOPIC my_topic KAFKA_GROUP_ID my_group_id
Where $TB_ENV
is one of stg
or prod
.
Check how to use environment variables to deploy to staging and production environments
Next steps¶
- Understand CI/CD processes on Tinybird.
- Read about implementing test strategies.