Triggering dltHub Pro pipelines from Snowflake

Moved a dlt pipeline off SPCS onto dltHub Pro. Still want Snowflake to push the button.

Share
Triggering dltHub Pro pipelines from Snowflake

TL/DR: dltHub Pro launched this week, I moved a pipeline off ❄️ Snowflake Container Services onto it, but I still want Snowflake to push the button. A small stored procedure over an External Access Integration does the job.


I had a dlt pipeline (Microsoft Graph API ➡️ Snowflake) running quite happily inside SPCS.

Can you run dlt inside Snowflake? (Part 2/2: SPCS)
In my last post I explored whether dlt could run in a Snowflake UDF... and failed. But can it still run inside Snowflake using a Container? TL/DR: yes, it does 😎

It got triggered by a Snowflake task, which is convenient: the warehouse owns the schedule, the warehouse owns the downstream refresh task, and everything lives in one place. When dltHub Pro showed up, I wanted to see how easy is it to migrate from one runtime to another and reduce the Snowflake credit consumption. SPCS is already quite cheap, but if I have capacity in dltHub Pro, I better use it 😅. So the goal of this migration was small but specific: keep Snowflake as the orchestrator of record, even though the pipeline now runs somewhere else entirely.

Side note: dltHub Pro is also available on the ❄️ Marketplace

What changed in the move

The pipeline itself barely changed. Same resources, same incremental keys, same VARIANT columns landing in RAW.AZURE_GRAPH. The interesting differences are around the edges.

Three things needed work:

  • A deployment manifest (__deployment__.py) that declares the job and its cron schedule
  • A proxy wrapper, to route to Snowflake:
dltHub Pro is here. So is my static-egress proxy VM.
How I kept IP‑locked Snowflake and Azure databases happy with a managed runtime that runs “wherever Python runs”
  • A way to trigger a run from Snowflake, on demand

The first two are dltHub housekeeping. The third is the part I wanted to write up.

Why trigger from Snowflake at all?

dltHub has its own cron scheduler and I'm using it in plenty of places. But cron is not the only reason a pipeline runs - sometimes the dlt pipeline is a downstream task of a different Snowflake task, and I want that relationship to stay inside Snowflake.

What I want is a one‑liner I can paste into a Snowflake worksheet or task:

call meta.load.p_trigger_dlthub_pipeline('name');

No browser tab, no CLI and no local API key. Snowflake already has the secret and the network egress.

The dltHub side: a REST endpoint

dltHub exposes a trigger endpoint:

POST https://api.dlthub.com/api/v1/workspaces/{workspace_id}/scripts/trigger

The body wants a list of job_refs. This is where I lost the most time, so it's worth being explicit: the ref must be the full dotted path, jobs.__deployment__.<name>. The bare function name returns 404. The half-qualified __deployment__.<name> also returns 404.

The dlthub job list CLI prints refs without the jobs. prefix. That output is misleading: the REST API always requires it. I went round in circles for a bit before I figured this out.

Authentication is a Bearer token: a user API key, created via the runtime client or UI:

from dlt_runtime.runtime_clients.api.client import AuthenticatedClient
from dlt_runtime.runtime_clients.api.api.api_keys import create_user_api_key
from dlt_runtime.runtime_clients.api.models.create_user_api_key_request import (
    CreateUserApiKeyRequest,
)
client = AuthenticatedClient(base_url="https://api.dlthub.com", token=auth_token)
resp = create_user_api_key.sync(
    client=client,
    body=CreateUserApiKeyRequest(name="snowflake-trigger-test", expires_in_days=365),
)
print("Key:", resp.key) 

The key is shown once. I copied it straight into a Snowflake secret and moved on.

The Snowflake side: EAI + secret + procedure

Snowflake will not call an external API without an External Access Integration in front of it. The setup is four objects:

create or replace network rule meta.integration.nr_dlthub
  mode = egress
  type = host_port
  value_list = ('api.dlthub.com:443');

create or replace secret meta.integration.se_dlthub_api_key
  type = generic_string
  secret_string = '<the user api key>';

create or replace external access integration i_dlthub
  allowed_network_rules = (meta.integration.nr_dlthub)
  allowed_authentication_secrets = (meta.integration.se_dlthub_api_key)
  enabled = true;

Once those exist, a Python stored procedure can call out. The interface I went for: just the bare job function name plus an optional dry_run flag.

create or replace procedure meta.load.p_trigger_dlthub_pipeline(
  job_name string,
  dry_run boolean default false
)
returns variant
language python
runtime_version = '3.12'
packages = ('snowflake-snowpark-python', 'requests')
handler = 'main'
external_access_integrations = (i_dlthub)
secrets = ('api_key' = meta.integration.se_dlthub_api_key)
as
$$
import _snowflake, requests, json

WORKSPACE_ID = '<the dlthub workspace id>'

def main(session, job_name, dry_run):
    job_ref = f'jobs.__deployment__.{job_name}'
    url = f'https://api.dlthub.com/api/v1/workspaces/{WORKSPACE_ID}/scripts/trigger'
    headers = {
        'Authorization': f'Bearer {_snowflake.get_generic_secret_string("api_key")}',
        'Content-Type': 'application/json',
    }
    payload = {'job_refs': [job_ref], 'dry_run': bool(dry_run)}
    r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
    r.raise_for_status()
    return r.json()
$$;

The procedure takes the bare name (for example azure_graph_to_snowflake) and prepends the prefix internally, so the caller never has to remember it. A dry_run=true flag is useful when I just want to confirm the call is wired up without actually creating a run.

-- check the plumbing without launching a run
call meta.load.p_trigger_dlthub_pipeline('azure_graph_to_snowflake', true);

-- actually trigger it
call meta.load.p_trigger_dlthub_pipeline('azure_graph_to_snowflake');

The response is a JSON blob with the new run ID, which is enough to compose a deeplink into the dltHub UI if I want to follow along. The pipeline runs asynchronously in dltHub then - if there is another downstream task, it should be triggered by the dlt job, right after the pipeline is done.

The proxy detour (worth flagging)

There is one piece of the migration that's not about Snowflake but is worth flagging. dltHub jobs run inside containers. Those containers have no default outbound route to my Snowflake account. I solved it with a Squid proxy on a small Elestio VM and a _snowflake_proxy() context manager that sets HTTP_PROXY/HTTPS_PROXY around the dlt run:

dltHub Pro is here. So is my static-egress proxy VM.
How I kept IP‑locked Snowflake and Azure databases happy with a managed runtime that runs “wherever Python runs”

The first run failed with a 403 Forbidden from Azure AD. Turned out my NO_PROXY list was too narrow: login.microsoftonline.com was being shoved through Squid, which (correctly) refused. The fix:

NO_PROXY=.amazonaws.com,.microsoftonline.com,.microsoft.com,127.0.0.1,localhost

.amazonaws.com is in there because dltHub uploads run artifacts to S3 and that traffic must not be proxied either. Once the exclusions were right, the pipeline ran cleanly.

What I kept, what I dropped

The old SPCS task is gone. The downstream task that used to hang off it is now triggered directly by the dlt job via EXECUTE TASK at the end of the dlt run. So the chain is:

dltHub run  →  load into RAW.AZURE_GRAPH  →  EXECUTE TASK TA_DOWNSTREAM

From a Snowflake task (or worksheet), that whole chain can be kicked off with one stored procedure call. The orchestration story stays Snowflake-shaped even though the runtime no longer is 😎