Refresh Tableau from inside Snowflake

We don't need no orchestration!

Refresh Tableau from inside Snowflake
We don't need no orchestration!

TL/DR: We don't need no orchestration!


The challenge

In many cases I use an orchestrator for my data pipelines (mostly Azure Data Factory). However, there are also many cases a Snowflake-external orchestrator is not required or even useful. When the data pipeline is independent from anything else, it might just as well run as a Task in Snowflake. As an example: I stream tracking data into Snowflake (Using Snowplow and Decodable), and once per day I want to run a procedure transforming the latest raw data and add it to the data model. Afterwards, the data from this model should be extracted into a Tableau data source (or workbooks if you prefer not to use data sources) in my Tableau Cloud (or any other visualization tool for that matter - presumably this could be directly translated to any tool you want to use) - not because Snowflake couldn’t handle Tableau connecting to the data model directly, but because it’s cheaper to duplicate and extract it once per day and not use a Snowflake warehouse every time someone looks at a Dashboard in Tableau.

There are 2 ways to trigger a Tableau data source to refresh: Either the extract is scheduled within Tableau Cloud or the extract is triggered via Tableau’s Rest API.

Triggering an extract via API can be done directly from inside Snowflake using a dedicated procedure. This is particularly the preferred way with soon available flexible serverless tasks, as it will be difficult to determine when exactly they run. And that’s the whole point: It doesn’t matter when a task runs. As soon as the data is prepared, the extract is triggered.

Perparation

Sending a request to Tableau’s API requires authentication - of course. Hence, as a first step, we’ll have to get an access token. Those tokens are valid for about 4 hours, so it’s not necessary to get a new token every time a request is made. To do so, we’ll save previously fetched tokens in a table in Snowflake:

create or replace table meta.load.t_token (
  system varchar primary key rely comment 'The system or API this token can be used with'
  , token varchar comment 'The token to authenticate with'
  , inserted timestamp_ntz(9) default current_timestamp() comment 'When was this token inserted into this table?'
  , valid_from timestamp_ntz(9) default current_timestamp() comment 'Since when can this token be used?'
  , valid_to timestamp_ntz(9) comment 'Until when can this token be used before it needs to be refreshed?'
)
comment = 'Table to temporarily save API auth tokens';

Next, we need to store the personal access token for Tableau in Snowflake:

create or replace secret meta.integration.se_tableau_pat
type = password
username = '[your_Tableau_PAT_name]'
password = '[your_Tableau_PAT_secret]'
comment = 'Tableau PAT';

Also, to enable sending requests to endpoints outside of Snowflake, we’ll need an external access integration:

Replace the hostname here with your Tableau host
create or replace network rule meta.integration.nr_tableau
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = ('[your_Tableau_pod].online.tableau.com')
comment = 'Network Rule to access online.tableau.com';

use role accountadmin;

create or replace external access integration i_tableau
allowed_network_rules = (meta.integration.nr_tableau)
allowed_authentication_secrets = (meta.integration.se_tableau_pat)
enabled = true
comment = 'Integration to access online.tableau.com';

grant usage on network rule meta.integration.nr_tableau to role sysadmin;
grant usage on integration i_tableau to role sysadmin;

use role sysadmin;

Grant usage of both the network rule and integration to whichever role you use to create (and own) the procedure below.

Get a new token

Whenever the stored token expires, we’ll have to get a new one. This is done by the first of two procedures:

create or replace procedure meta.load.p_get_token_tableau()
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests')
handler = 'main'
comment = 'Refresh Tableau API token'
external_access_integrations = (i_tableau)
secrets = ('pat' = meta.integration.se_tableau_pat)
as
$$
import snowflake.snowpark as snowpark
import requests
from datetime import datetime, timedelta
import _snowflake

def get_token():
  pat_name = _snowflake.get_username_password('pat').username
  pat_secret = _snowflake.get_username_password('pat').password

  # Signin endpoint URL
  signin_url = f'https://[your_Tableau_pod].online.tableau.com/api/3.25/auth/signin'
  
  # Define headers
  headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json'
  }

  # Prepare the request payload
  payload = {
    "credentials": {
      "personalAccessTokenName": pat_name,
      "personalAccessTokenSecret": pat_secret,
      "site": {
        "contentUrl": "[your_Tableau_site_name]"
      }
    }
  }

  # Make the POST request to get the token
  response = requests.post(signin_url, headers=headers, json=payload)

  # Check for successful response
  if response.status_code == 200:
    token_json = response.json()
    access_token = token_json['credentials']['token']
    return access_token
  else:
    raise Exception(f"Failed to obtain token: {response.status_code} - {response.text}")

def main(session: snowpark.Session):
  # Retrieve the token
  token = get_token()

  # Delete previous token from table
  query_delete = f"""
    DELETE FROM META.LOAD.T_TOKEN
    WHERE SYSTEM = 'tableau'
  """
  session.sql(query_delete).collect()

  # Insert new token
  system_value = 'tableau'
  valid_to_value = datetime.now() + timedelta(hours=2)
  query_insert = f"""
    INSERT INTO META.LOAD.T_TOKEN (SYSTEM, TOKEN, VALID_TO)
    VALUES ('{system_value}', '{token}', '{valid_to_value.strftime('%Y-%m-%d %H:%M:%S')}')
  """
  session.sql(query_insert).collect()

  return f"Tableau token refreshed"
$$;

This procedure uses your individual Tableau site name and will store the refreshed token in the previously created table. 

I limit the validity to 2 hours here: Tableau says it should be valid for about 4 hours, but with more than 2 I keep getting errors

From there it can be picked up by the second procedure:

Refresh data sources

To identify data sources to be refreshed when a data model in Snowflake is ready, a little bit of meta data is required. I use tags as they can easily be searched for in the API. For instance, all data sources related to my “tracking_data” carry such a tag.

create or replace procedure meta.load.p_tableau_refresh_datasource(I_TAG varchar)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests')
handler = 'main'
comment = 'Refresh Tableau data source'
external_access_integrations = (i_tableau)
as
$$
import snowflake.snowpark as snowpark
import requests

def get_token(session: snowpark.Session):
  # Select from META.LOAD.T_TOKEN
  df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'tableau' LIMIT 1").collect()
  
  # Check if there's a row
  if len(df) == 0 or df[0]['VALID_TO'] <= session.sql("SELECT CURRENT_TIMESTAMP() AS CURRENT_TIMESTAMP").collect()[0]['CURRENT_TIMESTAMP'].replace(tzinfo=None):
    # Call procedure to get new token
    session.sql("CALL META.LOAD.P_GET_TOKEN_TABLEAU()").collect()
    # Select again
    df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'tableau' LIMIT 1").collect()

  # Return the TOKEN value
  if len(df) > 0:
    return df[0]['TOKEN']
  else:
    return "No token found"

def main(session: snowpark.Session, I_TAG):
  # Retrieve the token
  token = get_token(session)

  # Get datasources to be refreshed
  datasources_url = f'https://[your_Tableau_pod].online.tableau.com/api/3.25/sites/[your_Tableau_site_id]/datasources?filter=tags:eq:{I_TAG},hasExtracts:eq:true&fields=id'
  datasources_headers = {
    'X-Tableau-Auth': token,
    'Accept': 'application/json'
  }
  datasources_response = requests.get(datasources_url, headers=datasources_headers)

  # Check for successful response
  if datasources_response.status_code == 200:
    datasources_json = datasources_response.json()
  else:
    raise Exception(f"Failed to get datasources: {datasources_response.status_code} - {datasources_response.text}")

  # For each datasource
  for datasource in datasources_json['datasources']['datasource']:
    datasource_id = datasource['id']
    
    refresh_url = f'https://[your_Tableau_pod].online.tableau.com/api/3.25/sites/[your_Tableau_site_id]/datasources/{datasource_id}/refresh'
    refresh_headers = {
      'X-Tableau-Auth': token,
      'Accept': 'application/json',
      'Content-Type': 'application/xml'
    }
    refresh_payload = f'<tsRequest></tsRequest>'
      
    # Send the request
    refresh_response = requests.post(refresh_url, headers=refresh_headers, data=refresh_payload)

    # Check for successful response
    if refresh_response.status_code != 202: #Refreshes run asynchronously
      raise Exception(f"Failed to refresh datasources: {refresh_response.status_code} - {refresh_response.text}")

  return f'Tableau datasources refreshed'
$$;

This procedure doesn’t use the Tableau site name, but the site ID.

Why only use one identificator if you can have two?

First it will check whether there is a valid token in the previously defined table (if not, get a new one). Then it finds all data sources with a particular tag and using extracts (if a data source doesn’t use extracts, there’s nothing to be refreshed):

filter=tags:eq:{I_TAG},hasExtracts:eq:true

Afterwards it loops through all the found data sources to refresh each of them.

Presuming there are no more than 100 data sources found with a particular tag - but if you have more than 100 data sources to be refreshed, pagination is probably not the most important problem

Using the procedures

You can call the second procedure from a task or another procedure, i.e.:

create or replace task meta.load.ta_refresh_tracking_data_model
schedule = 'USING CRON 5 0 * * * Europe/Zurich'
comment = 'Update data model every day at 00:05'
as
begin
  call meta.load.p_refresh_tracking_data_model();
  call meta.load.p_tableau_refresh_datasource('tracking_data');
end;