Can you run dlt inside Snowflake? (Part 2/2: SPCS)

In my last post I explored whether dlt could run in a Snowflake UDF... and failed. But can it still run inside Snowflake using a Container? TL/DR: yes, it does 😎

Can you run dlt inside Snowflake? (Part 2/2: SPCS)

In my last post I explored whether dlt could run in a Snowflake UDF... and (initially) failed:

Can you run dlt inside Snowflake? (Part 1/2: UDF)
So, dlt by dltHub runs β€œwhere Python runs”... I guess that means it should run in a Snowflake UDF, too. TL/DR: no, it doesn’t 😎 But hey: negative results are results, too, right? πŸ˜…

dlt requires access to the local filesystem, so to run it inside Snowflake, I tried again using Snowpark Container Service (SPCS), Snowflake's managed container orchestration platform... and this time it worked like a charm.

This approach requires a little more setup than simply creating a procedure, but also has a major advantage: Running the smallest instance of a container is (currently) waaay cheaper than running a warehouse - like 6% of the cost of the smallest warehouse, cf. the credit consumption table:

And in the end, I am still able to trigger dlt pipelines using Snowflake tasks without any external infrastructure.

If you want to follow along: I configured the (end to end) dlt pipeline locally, created a container executing the pipeline, pushed that container to SPCS and now execute this container any time I want the pipeline to run.

Prerequisites

To set up a dlt pipeline this way, we'll need a few components:

  1. Python and dlt installed (obviously πŸ™„) - I used Python 3.11.9 and dlt 1.20.0
  2. Docker Desktop installed to locally create the container that will later run in Snowflake SPCS
  3. Snowflake CLI installed to push the container from local to Snowflake
  4. And of course a local IDE (like VSCode), ideally with extensions for the 3 other components

With this being set up, let's first create the end-to-end pipeline using dlt...

Create the dlt pipeline

The data I want to materialize in Snowflake (for analysis and maybe some rETL) in this example is data on Jira issues. dlt provides a framework for those, which is handy in getting started, but since Atlassian changed many endpoints since this framework was last updated, it's actually faster to start based on dlt's generic REST source framework and adjust it to the Jira API. In the terminal of VSCode and inside the local folder I want to use, the first step is:

dlt init rest_api snowflake

This creates most pieces I'll later need containerized automatically, basically loading template files from dltHub. I'll quickly go through most of them below, but wanted to briefly mention ./requirements.txt used for the container later on: it only tells the container to install dlt - that's it. Nothing to change here.

./load_jira_to_snowflake.py

The created .py file contains the script to run the pipeline. I renamed and modified it from the template to only contain what I actually need. This one file declares the whole logic for dlt and herein lies the beauty of it:

from typing import Any

import dlt
from dlt.sources.rest_api import (
    RESTAPIConfig,
    rest_api_resources,
)


@dlt.source(name="jira")
def jira_source() -> Any:
    # Load credentials from sources.jira section in secrets.toml
    email = dlt.secrets["sources.jira.email"]
    api_token = dlt.secrets["sources.jira.api_token"]

    config: RESTAPIConfig = {
        "client": {
            "base_url": "https://<my subdomain>.atlassian.net/",
            "auth": {
                "type": "http_basic",
                "username": email,
                "password": api_token,
            },
        },
        # Default configuration inherited by all resources
        "resource_defaults": {
            "primary_key": "id",
            "write_disposition": "replace",  # Replace all data on each run
            "endpoint": {
                "paginator": {
                    "type": "offset",
                    "limit": 100,
                    "offset": 0,
                    "offset_param": "startAt",
                    "limit_param": "maxResults",
                    "stop_after_empty_page": True,  # Stop when no more results
                },
            },
        },
        "resources": [
            {
                "name": "issues",
                "write_disposition": "merge",   # Override the default for issues
                "max_table_nesting": 3,  # Unnest comments, changelog, etc.
                "columns": {
                    # Clustering for query performance on project-based queries
                    "fields__project__id": {
                        "cluster": True,
                    },
                    # Prevent unnesting of these custom fields - keep as JSON
                    "fields__customfield_10003": {"data_type": "json"},
                    "fields__customfield_10020": {"data_type": "json"},
                    "fields__customfield_10021": {"data_type": "json"},
                    "fields__customfield_10108": {"data_type": "json"},
                    "fields__customfield_10109": {"data_type": "json"},
                    "fields__customfield_10111": {"data_type": "json"},
                    "fields__customfield_10112": {"data_type": "json"},
                    "fields__customfield_10118": {"data_type": "json"},
                    "fields__customfield_10125": {"data_type": "json"},
                    "fields__customfield_10169": {"data_type": "json"},
                    "fields__customfield_10188": {"data_type": "json"},
                    "fields__customfield_10193": {"data_type": "json"},
                    "fields__customfield_10196": {"data_type": "json"},
                    "fields__customfield_10218": {"data_type": "json"},
                    "fields__customfield_10230": {"data_type": "json"},
                    "fields__customfield_10240": {"data_type": "json"},
                    "fields__customfield_10244": {"data_type": "json"},
                    "fields__customfield_10245": {"data_type": "json"},
                    "fields__customfield_10262": {"data_type": "json"},
                    "fields__customfield_10263": {"data_type": "json"},
                },
                "endpoint": {
                    "path": "rest/api/3/search/jql",    # https://developer.atlassian.com/cloud/jira/platform/rest/v3/api-group-issue-search/#api-rest-api-3-search-jql-get
                    "data_selector": "issues",
                    "paginator": {
                        "type": "cursor",
                        "cursor_path": "nextPageToken", # Paginator stops when nextPageToken is missing on the last page
                        "cursor_param": "nextPageToken",
                    },
                    "incremental": {
                        "cursor_path": "fields.updated",
                        "start_param": "jql",
                        "initial_value": "1970-01-01T00:00:00.000+0000",
                        "convert": lambda val: f'project is not EMPTY AND updated >= "{val[:10]}"',
                    },
                    "params": {
                        "maxResults": 100,
                        "fields": "*all",
                        "expand": "changelog",
                    },
                },
            },
            {
                "name": "projects",
                "max_table_nesting": 0,  # Don't create any child tables for projects
                "endpoint": {
                    "path": "rest/api/3/project/search",    # https://developer.atlassian.com/cloud/jira/platform/rest/v3/api-group-projects/#api-rest-api-3-project-search-get
                    "data_selector": "values",
                    "params": {
                        "expand": "description,lead,issueTypes,url,projectKeys,permissions,insight"
                    },
                },
            },
        ],
    }

    yield from rest_api_resources(config)


def load_jira() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="jira",
        destination="snowflake",
        dataset_name="jira",
    )

    load_info = pipeline.run(jira_source())
    print(load_info)


if __name__ == "__main__":
    load_jira()

I only need data on issues and projects, so those are my two endpoints. I am potentially interested in the changelog of issue field values, so I expand on this, and I am not interested in unnesting any custom fields, so I exclude them from the automatic unnesting.

Wouldn't it be easier to define what fields to unnest instead of defining which not to unnest? Absolutely! If you know or find out how to do this, please let me know 😜

Some Jira endpoints behave differently than others (the "search issues" endpoint uses a nextPageToken, while the "search projects" endpoint uses a startAt position-based pagination). Other than that those two endpoints represent a mix of all the things dlt can do: unnesting, pagination, incremental loading (including deduplication for overlapping ranges), etc.

A side note on a detail I personally am rather impressed the smart people at dltHub implemented: See that lamda on the incremental load? This neat little trick uses dlt's incremental state and converts it into a JQL-compatible date for filtering so I don't have to re-load all issues since the beginning of time (1970-01-01 😎).

./.dlt/config.toml

The config.toml can remain untouched if the endpoint for anonymous telemetry data sent to to dltHub is later added to the external access integrations in Snowflake. The endpoint can be found in GitHub - or the telemetry can be disabled. Also, the log level can be adjusted as preferred:

[runtime]
log_level="CRITICAL" 
dlthub_telemetry = true
There are of course other configurations available, but dlt has some pretty good defaults, so I leave this out for now

./.dlt/secrets.toml

This is where all the secrets to access both Jira and Snowflake are stored. The major change from the dlt template in here is: I use a key pair for the service user in Snowflake, not a password.

[sources.jira]
email = "<user>"
api_token = "<API token>"

[destination.snowflake.credentials]
database = "RAW"
username = "DLT"
private_key = """-----BEGIN ENCRYPTED PRIVATE KEY-----
MI...Fj
-----END ENCRYPTED PRIVATE KEY-----"""
private_key_passphrase="<passphrase>"
host = "<account identifier - the stuff before .snowflakecomputing.com>"
warehouse = "<warehouse>"
role = "<role>"

Wait, is it ok to store the key as a string in the code? You should definitely ensure not checking in the secrets.toml into your git repo, but since it will be a file in a container, i.m.h.o. it's fine. I don't really see a difference from using an environment variable in the container config instead.

There is an alternative approach when using SPCS:

Snowpark Container Services: Additional considerations for services | Snowflake Documentation

The container manages an OAuth token to connect to Snowflake anyway - using this managed token instead of static secrets is both easy and secure. The modified ./load_jira_to_snowflake.py file and the dlt pipeline inside would then contain something like this:

...

def get_login_token():
  with open('/snowflake/session/token', 'r') as f:
    return f.read()

...

    pipeline = dlt.pipeline(
        pipeline_name="jira_pipeline",
        destination=dlt.destinations.snowflake(
            credentials={
                "username": "DLT",
                "database": "RAW",
                "schema": "JIRA",
                "authenticator": "oauth",
                "token": get_login_token(),
                "warehouse": "<warehouse>",
                "role": "<role>",
            }
        ),
        dataset_name="jira",
        # dev_mode=True,           # use a separate timestamped schema
        # refresh="drop_sources"   # drop tables and reset state for full reload
    )

...

However, this only works inside the SPCS container and is not testable locally. Hence, I hereby only mention this alternative approach, but will continue using my static secrets from secrets.toml anyway 😎

Create and push the container

When the dlt pipeline runs locally (you definitely want finish engineering it before pushing it to SPCS!), it contains everything it needs to run virtually anywhere a containerized version of it would run to. So let's containerize it!

To learn how to do this with Snowflake I found this intro (particularly the second half on the "Temperature Conversion REST API") useful:

Intro to Snowpark Container Services
Through this quickstart guide, you will explore Snowpark Container Services

Also, the smart people over at SELECT have an informative post on SPCS and how it works generally:

A Beginner’s Guide to Snowpark Container Services: Understanding the Building Blocks and Pricing
Jeff explains the concepts of Compute Pools, Services, And Image Repositories which are the building blocks of Snowpark Container Services

Create container components locally

To get the container into SPCS I need 2 more files: First, the dockerfile will tell docker how the container should look like and what it should do:

FROM python:3.11

# Set Python to run in unbuffered mode so logs appear immediately
ENV PYTHONUNBUFFERED=1

# Copy the packages file into the build
WORKDIR /app
COPY ./ /app/

# Run the install using the packages manifest file
RUN pip install --no-cache-dir -r requirements.txt

# When the container launches run the pipeline script
CMD ["python", "-u", "load_jira_to_snowflake.py"]

So all my container will do is install the requirements from requirements.txt and execute the load_jira_to_snowflake.py.

Second, Snowflake needs to know where to find the container, so I add a load-jira.yaml to my container registry. If you don't have a registry, yet, I'll get to this in a bit:

spec:
  containers:
    - name: load-jira
      image: <account identifier>.registry.snowflakecomputing.com/<catalog>/<schema>/<image_repo>/load_jira:latest
This .yaml allows many more configs for SPCS (like resource or monitor/log settings), but I leave those out here, too, to keep things simple

Create the Docker image

Now we're back to the VSCode terminal, with Docker Desktop up and running in the background. First, I create an image from my app (according to the dockerfile with mse being a replaceable shorthand for my name):

docker build --platform=linux/amd64 -t mse/load_jira:latest .

Next, I tag the image so it matches the .yaml config (cf. above):

docker tag mse/load_jira:latest <account identifier>.registry.snowflakecomputing.com/<catalog>/<schema>/<image_repo>/load_jira:latest

But before I could push the image to Snowflake, I now have to configure a few things over there...

Set up Snowflake

I need a few things for SPCS containers to work (cf. both the Snowflake intro and the SELECT post mentioned earlier)

First, I need an image repository to hold the Docker image:

use role accountadmin;

create image repository meta.load.image_repo;

show image repositories in schema meta.load;

grant read, write on image repository meta.load.image_repo to role sysadmin;
The repo's location was already used twice: in the .yaml specification and Docker tag - this is where the <catalog>/<schema>/<image_repo> path comes from

Second, I need an (internal) stage holding my service specification .yaml (what I called "container registry" earlier) - this is technically optional, but I find it more intuitive to create this .yaml once and reference it in the service definition later, than containing the specification in the service definition itself. But I guess that's a matter of personal preference.

use role sysadmin;

create stage if not exists meta.load.s_specs
directory = ( 
    enable = true
    auto_refresh = true
)
;

Third, I need a compute pool to execute the container on:

use role accountadmin;

create compute pool if not exists cp_elt
min_nodes = 1
max_nodes = 1
instance_family = cpu_x64_xs
auto_resume = true
auto_suspend_secs = 60
;

grant usage, monitor, operate on compute pool cp_elt to role sysadmin;

And finally, though this is optional and depending on individual Snowflake setups, I need an external access integration for my pipeline to access both the Jira endpoint and my own Snowflake instance. It's kind of weird to explicitly have to allow access to Snowflake from a container running in Snowflake, but since dlt doesn't use Snowflake's internal protocols, but rather public endpoints, this is a necessary step.

The Jira endpoint is fairly easy:

use role sysadmin;

create or replace network rule meta.integration.nr_jira_egress
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = ('<my subdomain again>.atlassian.net:443')
comment = 'Network Rule to access atlassian.net'
;

The Snowflake endpoints can be identified using SYSTEM$ALLOWLIST():

use role sysadmin;

select listagg(
    concat(
        '''', 
        value:host::string, 
        ':', 
        value:port::number, 
        ''''
    ), 
    ', '
)
from table(
    flatten(
        input => parse_json(system$allowlist())
    )
)
;

create or replace network rule meta.integration.nr_snowflake
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = (<output of the previous query here>)
comment = 'Network Rule to access Snowflake via https'
;

The third endpoint is for the anonymous telemetry data sent to to dltHub as found in GitHub:

use role sysadmin;

create or replace network rule meta.integration.nr_dlt
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = ('telemetry.scalevector.ai')
comment = 'Network Rule to access dltHub resources'
;

And with all 3 egress network rules in place, the EAI:

use role accountadmin;

create or replace external access integration i_jira_dlt
allowed_network_rules = (meta.integration.nr_jira_egress, meta.integration.nr_snowflake, meta.integration.nr_dlt)
enabled = true
comment = 'Integration to access atlassian.net + Snowflake'
;

grant usage on integration i_jira_dlt to role sysadmin;

Push the button

The next step is to upload the load-jira.yaml to the specification stage created above. I simply do this using Snowsight, but using the CLI works just as well:

To then push the docker image into the image repository, I sign in to Snowflake using the VSCode terminal and Snowflake CLI, using a role that is permitted to write on the repo:

snow spcs image-registry login --connection sysadmin

And finally push the Docker image to the repo:

docker push <account identifier>.registry.snowflakecomputing.com/<catalog>/<schema>/<image_repo>/load_jira:latest

Run the container in Snowflake

Now it finally is time to run the dlt pipeline in Snowflake. There are essentially two ways to run a container in SPCS either as a defined service or a job service.

A service is meant to run 24/7 unless explicitly suspended, while a job service is supposed to run and automatically terminate when finished. Sounds a lot like we should use a job service for the pipeline.

A job service is created and triggered explicitly every time it should run:

execute job service
in compute pool cp_elt
name = raw.jira.load_jira
async = true
external_access_integrations = (i_jira_dlt) -- only need atlassian + snowflake
from @meta.load.s_specs 
specification_file = 'load-jira.yaml'
;
An important note on this: Do not (like I totally did first) ignore the usage notes on this page - The order of parameters for the job creation is important. Start with the compute pool, then all other parameters (like name, async and EAI), and add the specification last.

With this approach I have a working solution I can trigger multiple times per day asynchronously. To be 100% confident the data is the freshest possible whenever I want to further process it, I would add a stream to the issues table and trigger the next task if the stream has data. A little more complex than just a chain of tasks, but the savings in credit consumption totally make up for that πŸ€‘

create stream if not exists raw.jira.str_issues
on table raw.jira.issues
;

create or alter task consume.jira.ta_do_something
user_task_managed_initial_warehouse_size = 'XSMALL'
serverless_task_max_statement_size = 'XSMALL'
allow_overlapping_execution = false
target_completion_interval = '30 MINUTES'
user_task_timeout_ms = 1800000 -- 30 minutes
task_auto_retry_attempts = 0
when system$stream_has_data('RAW.JIRA.STR_ISSUES')
as
begin
    -- consume the stream and advance its offset
    create or replace temporary table raw.jira.tmp_str_issues as
    select * from raw.jira.str_issues;
    drop table if exists raw.jira.tmp_str_issues;

    -- do something
end;