Ingest from Snowflake using incremental updates¶
Read on to learn how to incrementally load data from a Snowflake table into Tinybird, using Amazon S3 as an intermediary staging area.
An incremental loading strategy ensures that only new or updated rows are transferred.
Before you start¶
Before you begin, ensure the following:
- Snowflake Account: A Snowflake instance with the data you want to load.
- Amazon S3 Bucket: Access to an S3 bucket for staging data, along with appropriate permissions (write from Snowflake and read from Tinybird).
- Tinybird Account: A Tinybird workspace with an appropriate Data Source set up.
- Snowflake Permissions: Ensure the Snowflake user has privileges to:
- Access the target table.
- Create and manage stages.
- Unload data into S3.
- AWS Credentials: Ensure Snowflake can use AWS credentials (IAM Role or Access Key/Secret Key pair) to write to the S3 bucket.
Create the unload task in Snowflake¶
Follow these steps to create the unload task in Snowflake:
- Grant the required permissions in AWS IAM Console
Make sure the S3 bucket allows Snowflake to write files by setting up an appropriate IAM role or policy. You can use this template to create the policy and attach it to the AWS role:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:PutObject", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::your-bucket-name/path/*" } ] }
Replace your-bucket-name/path/*
with your bucket name, and optionally the path you want to grant access to.
- Create the storage integration
Run the following SQL statement to create the storage integration:
/* Create the S3 integration. */ CREATE or replace STORAGE INTEGRATION tinybird_integration TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<arn_role>' STORAGE_ALLOWED_LOCATIONS = ('*'); -- describe integration tinybird_integration;
Replace <arn_role>
with the ARN of the role created in the previous step.
- Create the file format
Run the following SQL statement to create the file format:
/* Create the file format for the output files generated. */ CREATE OR REPLACE FILE FORMAT csv_unload_format TYPE = 'CSV';
- Create the stage
Run the following SQL statement to create the stage:
/* And finally the stage we'll use to unload the data to. */ CREATE or replace STAGE tinybird_stage STORAGE_INTEGRATION = tinybird_integration URL = 's3://your-bucket-name/path/' FILE_FORMAT = csv_unload_format;
Replace your-bucket-name
and path
with your S3 bucket details.
Create the unload task¶
Run the following SQL statement to create the scheduled task that unloads the new records since the last successful execution to the S3 bucket:
/* Create the scheduled task that unloads the new records since * last successful execution to the S3 bucket. * * Note how it reads the timestamp of the last successful execution, * and leaves a one hour margin. * * Orders need to be deduplicated later in Tinybird. */ CREATE or replace TASK export_order_deltas WAREHOUSE = compute_wh SCHEDULE = 'USING CRON 05 * * * * UTC' AS BEGIN LET sql := 'COPY INTO @tinybird_stage/orders/orders_<ts> from ( select O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK from tinybird.samples.orders_incremental where o_orderdate >= ( SELECT coalesce(timestampadd(hour,-1,max(QUERY_START_TIME)),\'1970-01-01\') FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME=>\'export_order_deltas\')) where state = \'SUCCEEDED\' ORDER BY SCHEDULED_TIME )) max_file_size=1000000000'; sql := REPLACE(sql, '<ts>', TO_VARCHAR(CONVERT_TIMEZONE('UTC',current_timestamp()), 'YYYY_MM_DD__hh24_mi_ss')); EXECUTE IMMEDIATE (sql); RETURN sql; END;
Configure the ingestion in Tinybird¶
Create the S3 connection. For example, using the CLI:
tb connection create s3_iamrole
Follow the instructions to grant Tinybird read access to the same S3 bucket you used to unload data from Snowflake.
Then, create the data source using the S3 Connector, using a schema similar to the following:
SCHEMA > `O_ORDERKEY` Int64, `O_CUSTKEY` Int64, `O_ORDERSTATUS` String, `O_TOTALPRICE` Float32, `O_ORDERDATE` DateTime64(3), `O_ORDERPRIORITY` String, `O_CLERK` String ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYear(O_ORDERDATE)" ENGINE_SORTING_KEY "O_ORDERDATE, O_ORDERPRIORITY, O_CLERK" IMPORT_SERVICE 's3_iamrole' IMPORT_CONNECTION_NAME 'tinybird-tb-s3' IMPORT_BUCKET_URI 's3://tinybird-tb/snowflake/csv/orders/*.csv.gz' IMPORT_STRATEGY 'append' IMPORT_SCHEDULE '@auto'
Push your project to Tinybird using:
tb push
The new files Snowflake writes to the bucket are automatically ingested by Tinybird in a few seconds.
Handle duplicates in Tinybird¶
Use a materialized view to handle duplicates in Tinybird. For example:
NODE sf_orders_0 SQL > SELECT * FROM sf_orders_incremental_landing TYPE materialized DATASOURCE sf_orders
Note the ReplacingMergeTree
and the ENGINE_VER
option to handle deduplicates using the primary key:
SCHEMA > `O_ORDERKEY` Int64, `O_CUSTKEY` Int64, `O_ORDERSTATUS` String, `O_TOTALPRICE` Float32, `O_ORDERDATE` DateTime64(3), `O_ORDERPRIORITY` String, `O_CLERK` String ENGINE "ReplacingMergeTree" ENGINE_PARTITION_KEY "toYYYYMM(O_ORDERDATE)" ENGINE_SORTING_KEY "O_ORDERKEY" ENGINE_VER "O_ORDERDATE"
Remember to use final
when querying.
Next steps¶
See the following resources: