Stream from Google Pub/Sub

In this guide you'll learn how to send data from Google Pub/Sub to Tinybird.

Overview

Tinybird is a Google Cloud partner & supports integrating with Google Cloud services.

Google Pub/Sub is often used as a messaging middleware that decouples event stream sources from the end destination. Pub/Sub streams are usually consumed by Google's DataFlow which can send events on to destinations such as BigQuery, BigTable, or Google Cloud Storage.

This DataFlow pattern works with Tinybird too, however, Pub/Sub also has a feature called Push subscriptions which can forward messages directly from Pub/Sub to Tinybird. The following guide steps use the subscription approach.

Push messages from Pub/Sub to Tinybird

1. Create a Pub/Sub topic

Start by creating a topic in Google Pub/Sub following the Google Pub/Sub documentation.

2. Create a push subscription

Next, create a Push subscription in Pub/Sub.

Set the Delivery Type to Push. In the Endpoint URL field, ue the following snippet (which uses the Tinybird Events API) and pass your own Token, which you can find in your Workspace > Tokens:

Endpoint URL
https://api.tinybird.co/v0/events?name=events_demo&wait=true&token=<your-token>

You don't need to create the Data Source in advance, it will automatically be created for you. This snippet also includes the wait=true parameter, which is explained in the Events API docs.

Add Push subscription to Pub/Sub topic

3. Send sample messages

Generate and send some sample messages to test your connection. If you don't have your own messages to test, use this this script.

4. Decode message data

You'll start to see messages arrive in the Data Source in Tinybird.

By default, a Pub/Sub Push subscription encodes the message body in base64.

Raw Pub/Sub messages

To decode the message body, you can use the base64Decode() function. You can then start to extract values from the JSON.

Below is an example Tinybird Pipe that decodes the message body, extracts values from the JSON message, and writes to a Materialized View:

Decode message and parse JSON with Materialized View
NODE events_payload_decoded
SQL >
  SELECT
      message_message_id as message_id,
      message_publish_time,
      base64Decode(message_data) as message_data
    FROM events_demo

NODE events_payload_1
SQL >
  SELECT
      message_id,
      message_publish_time,
      CAST(JSONExtractString(message_data, 'timestamp'), 'DateTime64') as payload_timestamp,
      JSONExtractString(message_data, 'event') as payload__event,
      JSONExtractInt(message_data, 'product_id') as payload__product_id,
      JSONExtractString(message_data, 'url') as payload__url,
      JSONExtractString(message_data, 'browser') as payload__browser,
      JSONExtractString(message_data, 'OS') as payload__os,
      JSONExtractString(message_data, 'cart_id') as payload__cart_id
    FROM
      events_payload_decoded

TYPE materialized
DATASOURCE events_payload_1_mv
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(message_publish_time)"
ENGINE_SORTING_KEY "message_publish_time, payload__browser, payload__os, payload__cart_id"

And that's it!

Next steps