Ingest from Snowflake using incremental updates

Read on to learn how to incrementally load data from a Snowflake table into Tinybird, using Amazon S3 as an intermediary staging area.

An incremental loading strategy ensures that only new or updated rows are transferred.

Before you start

Before you begin, ensure the following:

  • Snowflake Account: A Snowflake instance with the data you want to load.
  • Amazon S3 Bucket: Access to an S3 bucket for staging data, along with appropriate permissions (write from Snowflake and read from Tinybird).
  • Tinybird Account: A Tinybird workspace with an appropriate Data Source set up.
  • Snowflake Permissions: Ensure the Snowflake user has privileges to:
    • Access the target table.
    • Create and manage stages.
    • Unload data into S3.
  • AWS Credentials: Ensure Snowflake can use AWS credentials (IAM Role or Access Key/Secret Key pair) to write to the S3 bucket.
1

Create the unload task in Snowflake

Follow these steps to create the unload task in Snowflake:

  1. Grant the required permissions in AWS IAM Console

Make sure the S3 bucket allows Snowflake to write files by setting up an appropriate IAM role or policy. You can use this template to create the policy and attach it to the AWS role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:PutObject", "s3:AbortMultipartUpload"],
            "Resource": "arn:aws:s3:::your-bucket-name/path/*"
        }
    ]
}

Replace your-bucket-name/path/* with your bucket name, and optionally the path you want to grant access to.

  1. Create the storage integration

Run the following SQL statement to create the storage integration:

/* Create the S3 integration.
 */
CREATE or replace STORAGE INTEGRATION tinybird_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<arn_role>'
  STORAGE_ALLOWED_LOCATIONS = ('*');

-- describe integration tinybird_integration;

Replace <arn_role> with the ARN of the role created in the previous step.

  1. Create the file format

Run the following SQL statement to create the file format:

/* Create the file format for the output files generated.
 */
CREATE OR REPLACE FILE FORMAT csv_unload_format
  TYPE = 'CSV';
  1. Create the stage

Run the following SQL statement to create the stage:

/* And finally the stage we'll use to unload the data to.
 */
CREATE or replace STAGE tinybird_stage
  STORAGE_INTEGRATION = tinybird_integration
  URL = 's3://your-bucket-name/path/'
  FILE_FORMAT = csv_unload_format;

Replace your-bucket-name and path with your S3 bucket details.

2

Create the unload task

Run the following SQL statement to create the scheduled task that unloads the new records since the last successful execution to the S3 bucket:

/* Create the scheduled task that unloads the new records since
 * last successful execution to the S3 bucket.
 * 
 * Note how it reads the timestamp of the last successful execution,
 * and leaves a one hour margin.
 *
 * Orders need to be deduplicated later in Tinybird.
 */
CREATE or replace TASK export_order_deltas
    WAREHOUSE = compute_wh
    SCHEDULE = 'USING CRON 05 * * * * UTC'
AS
BEGIN
   LET sql := 'COPY INTO @tinybird_stage/orders/orders_<ts> from (
    select
        O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE,
        O_ORDERPRIORITY, O_CLERK
    from tinybird.samples.orders_incremental
    where o_orderdate >= (
        SELECT coalesce(timestampadd(hour,-1,max(QUERY_START_TIME)),\'1970-01-01\')
        FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME=>\'export_order_deltas\'))
        where state = \'SUCCEEDED\'
        ORDER BY SCHEDULED_TIME
    )) max_file_size=1000000000';

   sql := REPLACE(sql, '<ts>', TO_VARCHAR(CONVERT_TIMEZONE('UTC',current_timestamp()), 'YYYY_MM_DD__hh24_mi_ss'));
   
   EXECUTE IMMEDIATE (sql);

   RETURN sql;
END;
3

Configure the ingestion in Tinybird

Create the S3 connection. For example, using the CLI:

tb connection create s3_iamrole

Follow the instructions to grant Tinybird read access to the same S3 bucket you used to unload data from Snowflake.

Then, create the data source using the S3 Connector, using a schema similar to the following:

SCHEMA >
    `O_ORDERKEY` Int64,
    `O_CUSTKEY` Int64,
    `O_ORDERSTATUS` String,
    `O_TOTALPRICE` Float32,
    `O_ORDERDATE` DateTime64(3),
    `O_ORDERPRIORITY` String,
    `O_CLERK` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYear(O_ORDERDATE)"
ENGINE_SORTING_KEY "O_ORDERDATE, O_ORDERPRIORITY, O_CLERK"

IMPORT_SERVICE 's3_iamrole'
IMPORT_CONNECTION_NAME 'tinybird-tb-s3'
IMPORT_BUCKET_URI 's3://tinybird-tb/snowflake/csv/orders/*.csv.gz'
IMPORT_STRATEGY 'append'
IMPORT_SCHEDULE '@auto'

Push your project to Tinybird using:

tb push

The new files Snowflake writes to the bucket are automatically ingested by Tinybird in a few seconds.

4

Handle duplicates in Tinybird

Use a materialized view to handle duplicates in Tinybird. For example:

NODE sf_orders_0
SQL >

    SELECT *
    FROM sf_orders_incremental_landing

TYPE materialized
DATASOURCE sf_orders

Note the ReplacingMergeTree and the ENGINE_VER option to handle deduplicates using the primary key:

SCHEMA >
    `O_ORDERKEY` Int64,
    `O_CUSTKEY` Int64,
    `O_ORDERSTATUS` String,
    `O_TOTALPRICE` Float32,
    `O_ORDERDATE` DateTime64(3),
    `O_ORDERPRIORITY` String,
    `O_CLERK` String

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(O_ORDERDATE)"
ENGINE_SORTING_KEY "O_ORDERKEY"
ENGINE_VER "O_ORDERDATE"

Remember to use final when querying.

5

Next steps

See the following resources:

Updated