Salesforce to Snowflake without ELT-Tool

Let's cut out the middle-tool 😜

Salesforce to Snowflake without ELT-Tool
Let's cut out the middle-tool 😜

TL/DR: It is possible to sync (duplicate) data from Salesforce CRM into Snowflake without any tool in the middle.


The use case

Traditionally to duplicate1 data from one database (Salesforce CRM) to another database (Snowflake) one would use some kind of tool to do the job - either in batches or streamed: Airbyte, Fivetran, Portable, Azure Data Factory, … the options are virtually endless.

If you have all your Salesforce data in Salesforce Data Cloud, you don’t need to duplicate data to Snowflake. But if you use Salesforce Data Cloud, optimizing ELT cost is probably not your highest priority anyway 😏

However, if you don’t necessarily have to rely on too many of such a tool’s features - i.e. automatic (delayed) retries, collision management, error handling, logging or lineage graphs - or are fine with implementing those yourself where needed, you can just plug in to the same Salesforce API such tools use and load the data directly into Snowflake without any of those tools.

How?

Well: Snowpark of course!

Why would you to this?

Removing (dependency of) tools if they don’t add significant value is usually a good idea anyway. In this case, you also remove a lot of overhead those tools usually require for all their features. When I moved from Azure Data Factory to Snowpark for loading Salesforce data, I reduced the runtime of the daily delta-load pipeline to 20% of its previous value. And runtime (at least for ADF) means cost 😁

In this post I’ll provide some Snowflake procedures you can use as a starting point. They really only sync the data from one Salesforce org (if you need data from your staging org, you’ll particularly have to modify the token-procedure), but when you understand what they do, you can surely extend on their functionality and improve them (e.g. there are many aspects you could parameterize I hard-coded or you manage to utilize the Salesforce bulk API v2). If you do, please let me know (and copy your improvements 😉)! But first…

Some necessary setup

In my Snowflake environment, I have a catalog for raw data called raw, containing a schema for all Salesforce data called salesforce. Add to that a landing zone called stage.salesforce and an archive of previously observed rows called history.salesforce. The procedures populating these schemata live in a separate catalog called meta.load and some integration stuff in meta.integration:

create database if not exists raw 
comment = 'Raw data'
;

create schema if not exists raw.salesforce 
comment = 'Data from Salesforce'
;

create transient database if not exists stage -- no fail-safe required here
comment = 'Staged (temporarily) raw data'
;

create schema if not exists stage.salesforce
comment = 'Data from Salesforce'
;

create database if not exists history 
comment = 'History of raw data'
;

create schema if not exists history.salesforce 
comment = 'Data from Salesforce'
;

create database if not exists meta 
comment = 'Meta information'
;

create schema if not exists meta.load
comment = 'Control loading data'
;

create schema if not exists meta.integration
comment = 'Parking lot for integration stuff (e.g. API integrations and secrets)'
;

The history catalog was previously referenced here:

Asynchronous Child Jobs in Snowflake
Wait, what? Child labor?

Also, to connect to our Salesforce instance, we’ll need to safely store a Salesforce (service-) user’s credentials and an external access integration:

create or replace network rule meta.integration.nr_salesforce
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = ('*.sandbox.my.salesforce.com:443', '*.my.salesforce.com:443', '*.salesforce.com:443')
comment = 'Network Rule to access .salesforce.com'
;

create or replace secret meta.integration.se_salesforce
type = generic_string
secret_string = '{
    "client_id": "<your Salesforce client id here>",
    "client_secret": "<your Salesforce client secret here>",
    "username": "<your Salesforce username here>",
    "password": "<your Salesforce password here>",
    "security_token": "<your Salesforce security token here>"
}'
comment = 'Salesforce Access'
;

This secret is just a string, as 5 elements of a secret can’t (currently) be stored in single instances of other formats. Since this secret will be used in (Python) procedures, it will be easy enough to parse it later to get the required elements.

use role accountadmin
;

create or replace external access integration i_salesforce
allowed_network_rules = (meta.integration.nr_salesforce)
allowed_authentication_secrets = (meta.integration.se_salesforce)
enabled = true
comment = 'Integration to access .salesforce.com'
;

grant usage on network rule meta.integration.nr_salesforce to role sysadmin;
grant usage on integration i_salesforce to role sysadmin;

To control which Salesforce objects to sync, another table in meta.load will be used:

create or replace table meta.load.t_salesforce_objects
(
    name varchar
    , label varchar
    , sync boolean
    , date_field varchar(50)
    , api varchar(10)
)
cluster by (sync)
comment = 'Salesforce Objects to load';

insert into meta.load.t_salesforce_objects
values
    ('Account', 'Account', true, 'SystemModStamp', 'query')
    , ('Contact' , 'Contact', true, 'SystemModStamp', 'query')
    , ('ContactHistory', 'Contact History', true, 'CreatedDate', 'query')
;

Here, to get you started, I just added 3 objects to the sync. Add whichever objects you want to sync here.

Note the different date_field for the ContactHistory object: Change logs (objects with the “History” suffix) in Salesforce don’t have a SystemModStamp. There are also a couple of other objects where LastModifiedDate is the best available option (e.g. ContentNote).

Next, we need a table to map Salesforce data types to Snowflake data types:

create or replace table meta.load.t_salesforce_types
(
    salesforce_type varchar
    , snowflake_type varchar
    , use_length boolean
    , use_precisionandscale boolean
)
cluster by (salesforce_type)
comment = 'Salesforce Field Types to lookup';

insert into meta.load.t_salesforce_types
values
    ('ADDRESS','n/a',false, false) -- available in object description REST API, but not query/bulk REST API for some reason
    , ('ANYTYPE', 'VARCHAR', false, false)
    , ('BASE64', 'n/a', false, false) -- binary could be exported if required
    , ('BOOLEAN', 'BOOLEAN', false, false)
    , ('COMBOBOX', 'VARCHAR', false, false)
    , ('CURRENCY', 'NUMBER', false, true)
    , ('DATACATEGORYGROUPREFERENCE', 'VARCHAR', false, false)
    , ('DATE', 'DATE', false, false)
    , ('DATETIME', 'TIMESTAMP_NTZ(9)', false, false)
    , ('DOUBLE', 'NUMBER', false, true)
    , ('EMAIL', 'VARCHAR', true, false)
    , ('ENCRYPTEDSTRING', 'VARCHAR', true, false)
    , ('ID', 'VARCHAR', true, false)
    , ('INT', 'NUMBER', false, false)
    , ('LOCATION', 'n/a', false, false) -- available in object description REST API, but not query/bulk REST API for some reason
    , ('MULTIPICKLIST', 'VARCHAR', false, false)
    , ('PERCENT', 'NUMBER', false, true)
    , ('PHONE', 'VARCHAR', true, false)
    , ('PICKLIST', 'VARCHAR', true, false)
    , ('REFERENCE', 'VARCHAR', true, false)
    , ('STRING', 'VARCHAR', true, false)
    , ('TEXTAREA', 'VARCHAR', true, false)
    , ('TIME', 'TIMESTAMP_NTZ(9)', false, false)
    , ('URL', 'VARCHAR', true, false)
;

And we’ll create a table to store a table’s columns to make creating tables easier later:

create or replace table meta.load.t_salesforce_fields
(
    object varchar not null 
    , name varchar not null 
    , label varchar
    , type varchar
    , length number
    , scale number
    , precision number
    , custom boolean
    , externalid boolean
    , ordinal_position number
    , created timestamp_ntz(9) default current_timestamp()
)
comment = 'Salesforce Fields of Objects'
;

Finally, to exclude fields from loading for whatever reason (i.e. in case you don’t want to load sensitive data or just generally reduce the number of fields to sync to improve performance), we can manually store those in a table, too:

create or replace table meta.load.t_salesforce_fields_exclude
(
    object varchar not null 
    , name varchar not null 
    , comment varchar
    , created timestamp_ntz(9) default current_timestamp()
)
comment = 'Salesforce Fields of Objects to be ignored'
;

Get a token

To read data from Salesforce, we’ll first generate and store an access token.

If you prefer to re-authenticate with each request, you can also use the python package “simple-salesforce“, which is available in Snowflake… But I prefer to recycle the token to reduce the overall number of requests sent.

This token-storing approach was also previously referenced:
Refresh Tableau from inside SnowflakeHow to create a Snowflake procedure to refresh Tableau data sources as soon as the referenced data is ready for export
create or replace table meta.load.t_token
(
    system varchar primary key rely comment 'The system or API this token can be used with'
    , token varchar comment 'The token to authenticate with'
    , inserted timestamp_tz(9) default current_timestamp() comment 'When was this token inserted into this table?'
    , valid_from timestamp_tz(9) default current_timestamp() comment 'Since when can this token be used?'
    , valid_to timestamp_tz(9) comment 'Until when can this token be used before it needs to be refreshed?'
)
comment = 'Table to temporarily save API auth tokens'
;

In this table a token can temporarily be stored:

create or replace procedure meta.load.p_get_token_salesforce()
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests')
handler = 'main'
comment = 'Refresh Salesforce API token'
external_access_integrations = (i_salesforce)
secrets = ('sf' = meta.integration.se_salesforce)
as
$$
import _snowflake
import snowflake.snowpark as snowpark
import json
import requests
from datetime import datetime, timedelta

def get_new_token():
    secret_string = _snowflake.get_generic_secret_string('sf')
    secret_dict = json.loads(secret_string)

    # Signin endpoint URL
    url = f'https://login.salesforce.com/services/oauth2/token?grant_type=password&client_id=' + secret_dict['client_id'] + '&client_secret=' + secret_dict['client_secret'] + '&username=' + secret_dict['username'] + '&password=' + secret_dict['password'] + secret_dict['security_token']

    # Define headers
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    # Prepare an empty request payload
    payload = {}

    # Make the POST request to get the token
    response = requests.post(url, headers=headers, json=payload)

    # Check for successful response
    if response.status_code == 200:
        token_json = response.json()
        access_token = token_json['access_token']
        return access_token
    else:
        raise Exception(f"Failed to obtain token: {response.status_code} - {response.text}")

def main(session: snowpark.Session):
    # Select from META.LOAD.T_TOKEN
    df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'salesforce' LIMIT 1").collect()
    
    # Check if there's a row
    if len(df) == 0 or df[0]['VALID_TO'] < session.sql("SELECT CURRENT_TIMESTAMP() AS CURRENT_TIMESTAMP").collect()[0]['CURRENT_TIMESTAMP']:
        
        # Retrieve a new token
        new_token = get_new_token()
        
        # Delete previous token from table
        query_delete = f"""
            DELETE FROM META.LOAD.T_TOKEN
            WHERE SYSTEM = 'salesforce'
        """
        session.sql(query_delete).collect()
    
        # Insert new token
        valid_to_value = datetime.now() + timedelta(hours=2)
        query_insert = f"""
            INSERT INTO META.LOAD.T_TOKEN (SYSTEM, TOKEN, VALID_TO)
            VALUES ('salesforce', '{new_token}', '{valid_to_value.strftime('%Y-%m-%d %H:%M:%S')}')
        """
        session.sql(query_insert).collect()
        
        # Select again
        df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'salesforce' LIMIT 1").collect()

    # Return the TOKEN value
    if len(df) > 0:
        return df[0]['TOKEN']
    else:
        return "No token found"
    
$$
;

With this token we can then do 3 things:

  1. Check for a changed schema by comparing an object’s description in Salesforce with its corresponding table in Snowflake
  2. Create/recreate a table in Snowflake according to an object’s description in Salesforce
  3. Load data from Salesforce into a table in Snowflake

Check for schema change

In case a schema changed in Salesforce (or the table doesn’t exist in Snowflake, yet), the corresponding table’s schema in Snowflake should be adjusted accordingly:

create or replace procedure meta.load.p_salesforce_check_for_schema_change(
    I_OBJECT_NAME string
    , I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Check if the schema in Snowflake RAW changed for the provided Salesforce object'
external_access_integrations = (i_salesforce)
as 
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd
import json

def main(session: snowpark.Session, I_OBJECT_NAME: str, I_TOKEN: str):
    # Get a token
    if I_TOKEN is None:
        token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN_SALESFORCE"]
    else:
        token = I_TOKEN

    # Define URL to query Salesforce
    url = f'https://<your Salesforce org name here>.my.salesforce.com/services/data/v63.0/sobjects/{I_OBJECT_NAME}/describe/'
        
    # Define headers
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': f'Bearer ' + token
    }
        
    # Get object description from Salesforce
    response = requests.get(url, headers=headers)
    object_description = response.json()
        
    # Convert to Pandas DataFrame
    pandas_df = pd.DataFrame(object_description['fields'])
    pandas_df = pandas_df[~pandas_df['deprecatedAndHidden']]
    pandas_df = pandas_df[['name', 'type', 'length', 'precision', 'scale']]
        
    # Convert Pandas DataFrame to JSON
    filtered_json = pandas_df.to_dict(orient='records')
        
    # Stringify the JSON
    stringified_json = json.dumps(filtered_json)
    
    # Run the diffcheck
    # diffcheck_result = session.sql(f"select * from table(meta.load.f_salesforce_check_for_schema_change('{I_OBJECT_NAME}', '{stringified_json}'))").collect()
    diffcheck_result = session.sql(f"""with cte_parse as (
        select parse_json('{stringified_json}') as fields
        )
        , cte_flat as (
        select f.value as fields
        from cte_parse p
            , lateral flatten(input => p.fields) f
        )
        , cte_salesforce as (
        select
            upper(fl.fields:name::varchar)      as name
            , upper(fl.fields:type::varchar)    as type
            , fl.fields:length::number(38,0)    as length
            , fl.fields:precision::number(38,0) as precision
            , fl.fields:scale::number(38,0)     as scale
        from cte_flat fl
            left join meta.load.t_salesforce_fields_exclude ex
                on  upper(ex.object) = upper('{I_OBJECT_NAME}')
                and upper(ex.name) = upper(fl.fields:name::varchar)
        where upper(fl.fields:type::varchar) not in ('ADDRESS', 'LOCATION', 'BASE64')   -- exclude types known to not be importable via CSV staging
            and ex.name is null                                                         -- exclude fields known to not be importable for some reason
        )
        , cte_snowflake as (
        select
            column_name                 as name
        --  , data_type                 as type
            , character_maximum_length  as length
            , numeric_precision         as precision
            , numeric_scale             as scale
        --  , ordinal_position          as position
        from raw.information_schema.columns
        where   table_catalog = 'RAW'
            and table_schema = 'SALESFORCE'
            and table_name = upper('{I_OBJECT_NAME}')
        )
        select
            count(*)                as diff
            , listagg(sn.name, '|') as fields_not_found_in_salesforce   -- these fields are not present or different in Salesforce
            , listagg(sa.name, '|') as fields_not_found_in_snowflake    -- these fields are not present or different in Snowflake
            , case
                when count(*) > 0
                then true
                else false
            end                     as schema_changed
        from cte_salesforce sa
            full outer join cte_snowflake sn
                on  sn.name = sa.name
                and case 
                    when sn.length is not null                          -- string/text/varchar field in Snowflake
                        then sn.length >= sa.length
                    when sa.type = 'INT'                                -- integers are stored as number(38,0) in Snowflake
                        then (sn.precision = 38 and sn.scale = 0)
                    else (ifnull(sn.precision, 0) >= sa.precision and ifnull(sn.scale, 0) >= sa.scale)
                    end
        where sa.name is null or sn.name is null""").collect()
    
    # Convert the result to a dictionary
    diffeck_dict = diffcheck_result[0].as_dict()
    
    # Convert the dictionary to a stringified JSON
    diffcheck_json = json.dumps(diffeck_dict)

    # Return
    return str(diffcheck_json)
$$
;

This procedure will compare the arrays of columns in Salesforce and Snowflake and return a stringified JSON containing the information if they are different. If so, the next procedure will (re-) create an object’s table.

Why do I use pandas here instead of Snowpark data frames? Because converting an array (or in Python: list) of fields to a Pandas data frame and vice versa is just so easy.

Create table

In this step we’ll create tables not yet present or with a changed schema in raw.salesforcestage.salesforce and history.salesforce:

create or replace procedure meta.load.p_salesforce_createtables(
    I_OBJECT_NAME string
    , I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Create Tables based on collected field information'
external_access_integrations = (i_salesforce)
as 
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd
from datetime import datetime

def main(session: snowpark.Session, I_OBJECT_NAME: str, I_TOKEN: str):
    # Get a token
    if I_TOKEN is None:
        token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN_SALESFORCE"]
    else:
        token = I_TOKEN

    # Define URL to query Salesforce
    url = f'https://<your Salesforce org name here>.my.salesforce.com/services/data/v63.0/sobjects/{I_OBJECT_NAME}/describe/'
        
    # Define headers
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': f'Bearer ' + token
    }
        
    # Get object description from Salesforce
    response = requests.get(url, headers=headers)
    object_description = response.json()
        
    # Convert to Pandas DataFrame
    pandas_df = pd.DataFrame(object_description['fields'])
    pandas_df = pandas_df[~pandas_df['deprecatedAndHidden']]
    pandas_df = pandas_df[['name', 'label', 'type', 'length', 'precision', 'scale', 'custom', 'externalId']]
    
    # Remove unused data type
    pandas_df = pandas_df[~pandas_df['type'].isin(['address', 'location', 'base64'])]

    # Remove excluded field names
    fields_to_be_excluded_tbl = session.sql(f"SELECT NAME FROM META.LOAD.T_SALESFORCE_FIELDS_EXCLUDE WHERE UPPER(OBJECT) = UPPER('{I_OBJECT_NAME}')").collect()
    fields_to_be_excluded_arr = [row['NAME'] for row in fields_to_be_excluded_tbl]
    fields_to_be_excluded_arr_valid = [col for col in fields_to_be_excluded_arr if col in pandas_df.columns]
    pandas_df = pandas_df.drop(columns=fields_to_be_excluded_arr_valid)
    
    # Add an incremental number column
    pandas_df['ordinal_position'] = range(1, len(pandas_df) + 1)
    
    # Add the current object name
    pandas_df['object'] = f'{I_OBJECT_NAME}'
    
    # Add current timstamp
    pandas_df['created'] = pd.to_datetime(datetime.now())
    pandas_df['created'] = pandas_df['created'].dt.tz_localize('Europe/Zurich')

    # Convert DataFrame column names to uppercase
    pandas_df.columns = pandas_df.columns.str.upper()
    
    # Get the table schema
    table_schema = session.sql("describe table meta.load.t_salesforce_fields").collect()
    
    # Extract column names from the schema
    table_columns = [row['name'] for row in table_schema]
    
    # Reorder the DataFrame columns to match the table schema
    pandas_df = pandas_df[table_columns]

    # Delete rows from meta.load.t_salesforce_fields
    session.sql(f"delete from meta.load.t_salesforce_fields where upper(object) = upper('{I_OBJECT_NAME}')").collect()

    # Insert Pandas DataFrame into meta.load.t_salesforce_fields
    snowpark_df = session.write_pandas(
        df=pandas_df,
        database="META",
        schema="LOAD",
        table_name="T_SALESFORCE_FIELDS",
        use_logical_type=True,
        auto_create_table=False
    )

    # Get fields of tables to be created
    concatenated_fields = session.sql(f"""with cte_fields as (
        select
            fi.name
            , case
                when ty.use_length and fi.length > 0 then concat(ty.snowflake_type, '(', fi.length, ')')
                when ty.use_precisionandscale then concat(ty.snowflake_type, '(', fi.precision, ',', fi.scale, ')')
                else ty.snowflake_type end                                                                              as type 
            , fi.ordinal_position 
        from meta.load.t_salesforce_fields fi
            left join meta.load.t_salesforce_types ty
                on  ty.salesforce_type = upper(fi.type)
            left join meta.load.t_salesforce_fields_exclude fe
                on  fe.object = fi.object
                and fe.name = fi.name
        where fi.object = '{I_OBJECT_NAME}'
            and fe.name is null
        )
        select listagg (concat(name, ' ', type), ', ') within group (order by ordinal_position asc) as fields
        from cte_fields""").collect()[0]['FIELDS']

    # Create table in stage
    session.sql(f"""create or replace table stage.salesforce.{I_OBJECT_NAME}
        ( {concatenated_fields} )
        comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()

    # Create table in raw
    session.sql(f"""create or replace table raw.salesforce.{I_OBJECT_NAME}
        ( {concatenated_fields} )
        change_tracking = true
        comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()

    # Create table in hisotry
    session.sql(f"""create or replace table history.salesforce.{I_OBJECT_NAME}
        ( {concatenated_fields} )
        comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()

    # Return
    return 'Tables created'
$$
;

Again, why Pandas? Because the columnar manipulations of the data frame are more flexible than with a Snowpark data frame.

Note: In case you wanted to have your Salesforce data available in Iceberg tables, you could simply create the tables in raw.salesforce as Iceberg tables here 😉

Load one object’s data

Now it’s finally time to move some data! This procedure utilizes the queryAll endpoint of Salesforce to also include deleted records. If you don’t flag deleted records as deleted in Snowflake, you couldn’t really do delta-loads. However, if you don’t plan to do delta-loads anyway, you could simply use the query endpoint instead and ignore deleted records altogether.

Note: the object CampaignMember doesn’t use the Salesforce recycle bin for some reason, hence deleted records from this object will just disappear. At least for this object you’ll have to do a full load every now and then to remove deleted records from Snowflake, too. The frequency depends on what you plan to do with CampaignMembers, I do this once per week.

create or replace procedure meta.load.p_salesforce_load_object_data(
    I_OBJECT_NAME string
    , I_DATE_FIELD string default 'SystemModStamp'
    , I_FULL_LOAD boolean default false
    , I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Load object data from Salesforce to Snowflake'
external_access_integrations = (i_salesforce)
as 
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd

def main(session: snowpark.Session, I_OBJECT_NAME: str, I_DATE_FIELD: str, I_FULL_LOAD: bool, I_TOKEN: str):
    # Get a token
    if I_TOKEN is None:
        token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN"]
    else:
        token = I_TOKEN

    # Get fields
    fields_result = session.sql(f"""select
            listagg(column_name, ',') within group (order by ordinal_position asc)  as fields
        from raw.information_schema.columns
        where table_catalog = 'RAW'
            and table_schema = 'SALESFORCE'
            and table_name = upper('{I_OBJECT_NAME}')""").collect()
    if not fields_result or fields_result[0]["FIELDS"] is None:
        # Table does not exist
        return f'No fields found for object {I_OBJECT_NAME}'
    else:
        fields = fields_result[0]["FIELDS"]

    # Truncate stage
    session.sql(f"truncate table stage.salesforce.{I_OBJECT_NAME}").collect()

    # Get the table schema
    table_schema = session.sql(f"describe table stage.salesforce.{I_OBJECT_NAME}").collect()

    # Extract column names from the schema
    table_columns = [row['name'] for row in table_schema]

    # Set patterns to later identify datetime columns
    pattern_datetime= r'\d{4}-\d{2}-\d{2}.*\d{2}:\d{2}:\d{2}'
    pattern_date = r'\d{4}-\d{2}-\d{2}'
        
    # Set headers for all requests
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': f'Bearer {token}'
    }

    # Set base URL
    base_url = 'https://<your Salesforce org name here>.my.salesforce.com'
    
    # Set query for initial request
    if I_FULL_LOAD:
        query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}'
    else:
        watermark_result = session.sql(f"""select to_varchar(convert_timezone('UTC', max({I_DATE_FIELD})), 'YYYY-MM-DDTHH24:MI:SSZ') as watermark
            from raw.salesforce.{I_OBJECT_NAME}""").collect()
        if not watermark_result or watermark_result[0]["WATERMARK"] is None:
            query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}'
        else:
            watermark = watermark_result[0]["WATERMARK"]
            query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}+WHERE+{I_DATE_FIELD}+>+{watermark}+OR+IsDeleted+=+True'

    # Set URL for initial request
    url = f'{base_url}/services/data/v63.0/queryAll/?q={query}'

    # Initialize session with Salesforce API
    salesforce_session = requests.Session()

    # Fetch all records
    while True:
        response = salesforce_session.get(url, headers=headers)
        
        # Check if the request was successful
        if response.status_code == 200:
            response_json = response.json()
            records = response_json['records']
            if len(records) > 0:
                # Convert to Pandas DataFrame
                # Why Pandas and not Snowpark? => We're not doing any row-processing, just on columns - Pandas is faster in this case
                pandas_df = pd.DataFrame(records)
                    
                # Convert DataFrame column names to uppercase
                pandas_df.columns = pandas_df.columns.str.upper()
                    
                # Reorder the DataFrame columns to match the table schema
                pandas_df = pandas_df[table_columns]
                
                # Identify datetime columns using regex
                for col in pandas_df.columns:
                    # Check if the column contains datetime values
                    if pandas_df[col].astype(str).str.contains(pattern_datetime).any():
                        pandas_df[col] = pd.to_datetime(pandas_df[col], errors='coerce')
                    # Check if the column contains date values
                    elif pandas_df[col].astype(str).str.match(pattern_date).any():
                        pandas_df[col] = pd.to_datetime(pandas_df[col], errors='coerce')
                        if pd.api.types.is_datetime64_any_dtype(pandas_df[col]):
                            pandas_df[col] = pandas_df[col].dt.date
                        else:
                            pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else x)
                    # Check if a column has mixed types and convert to strings
                    elif pandas_df[col].apply(type).nunique() > 1:
                        pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else str(x))
                    # If it doesn't match either pattern, leave it as is
                    else:
                        pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else x)
                
                # Insert Pandas DataFrame to STAGE
                snowpark_df = session.write_pandas(
                    df=pandas_df,
                    database="STAGE",
                    schema="SALESFORCE",
                    table_name=f"{I_OBJECT_NAME.upper()}",
                    use_logical_type=True,
                    auto_create_table=False
                )
            
            # Check if there are more records
            if response_json['done']:
                break
            
            # Update URL to the next page
            next_url = response_json['nextRecordsUrl']
            url = f'{base_url}{next_url}'
        else:
            break

    # Upsert from STAGE to RAW
    if I_FULL_LOAD:
        session.sql(f"call stage.public.p_replace('SALESFORCE', '{I_OBJECT_NAME}', 'ID', '{I_DATE_FIELD}')").collect();
    else:
        session.sql(f"call stage.public.p_upsert('SALESFORCE', '{I_OBJECT_NAME}', 'ID', '{I_DATE_FIELD}')").collect();

    # Return
    return f'Data for object {I_OBJECT_NAME} updated'
$$
;

And once again, why Pandas? Pandas is faster in converting the Salesforce API’s JSON response to a data frame than Snowpark data frames would be. And we mostly do columnar manipulation of the data frame again. Hence, Pandas is just fine in this case.

Architectural note 1/2: This procedure is not supposed to be called directly but rather by a different procedure below. Hence it doesn’t check for schema changes itself.

Architectural note 2/2: Why would I parse and materialize each page returned by Salesforce here - isn’t this a lot of overhead? Indeed, it would be more performant to first load all pages into a large array/list and in the end parse and insert everything into the stage.salesforce table at once. However, depending on the size of the object (particularly on a full load) and the degree of parallelism, your warehouse might run out of memory. If you only parse and insert 1 page (with up to 2000 rows) at once, this takes a bit longer but runs fine on an XS warehouse. If you want to improve, balance this trade-off differently by chunking a couple more rows (say, 100k at once) before parsing and inserting.

Behavioral note: When doing a delta-load (filtering the watermark date), also include a filter for all deleted records. For some reason filtering deleted records only by SystemModStamp doesn’t necessarily return all deleted records even if their SystemModStamp would be inside the filter 🤷

And we’ll need two more helper procedures to upsert the data from stage.salesforce to raw.salesforce and history.salesforce. I use those two helpers for many other data sources, too, so I put them into stage.public:

create or replace procedure stage.public.p_replace (
    i_schema varchar
    , i_table varchar
    , i_keycolumn varchar
    , i_distinctifycolumn varchar
)
returns string
language javascript
comment = 'Truncate and replace Table from STAGE to RAW'
as
$$
    snowflake.execute({sqlText: `
        truncate table raw.` + i_schema + `.` + i_table + ` 
    `});

    snowflake.execute({sqlText: `
        insert into raw.` + i_schema + `.` + i_table + `
            select distinct
                tbl.*
            from stage.` + i_schema + `.` + i_table + ` as tbl
                join (
                    select
                        ` + i_keycolumn + `
                      , ` + i_distinctifycolumn + `
                      , row_number() over (partition by ` + i_keycolumn + ` order by ` + i_distinctifycolumn + ` desc) as rowno
                    from stage.` + i_schema + `.` + i_table + `
                ) filter
                    on  filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
                    and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
                    and filter.rowno = 1
            order by tbl.` + i_keycolumn + ` asc
    `});
    
    snowflake.execute({sqltext: `
        insert into history.` + i_schema + `.` + i_table + `
            select distinct
                tbl.*
            from stage.` + i_schema + `.` + i_table + ` as tbl
                left join history.` + i_schema + `.` + i_table + ` as filter
                    on  filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
                    and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
            where filter.` + i_keycolumn + ` is null 
            order by tbl.` + i_keycolumn + ` asc
    `});

    return 'Table replaced from STAGE to RAW';
$$
;

create or replace procedure stage.public.p_upsert (
    i_schema varchar
    , i_table varchar
    , i_keycolumn varchar
    , i_distinctifycolumn varchar
)
returns string
language javascript
comment = 'Upsert Table from STAGE to RAW'    
as
$$
    snowflake.execute({sqltext: `
        delete from raw.` + i_schema + `.` + i_table + ` rawtbl
            using (
                select distinct ` + i_keycolumn + `
                from stage.` + i_schema + `.` + i_table + `
            ) statbl
            where rawtbl.` + i_keycolumn + ` = statbl.` + i_keycolumn + `
    `});

    snowflake.execute({sqltext: `
        insert into raw.` + i_schema + `.` + i_table + `
            select distinct
                tbl.*
            from stage.` + i_schema + `.` + i_table + ` as tbl
                join (
                    select
                        ` + i_keycolumn + `
                      , ` + i_distinctifycolumn + `
                      , row_number() over (partition by ` + i_keycolumn + ` order by ` + i_distinctifycolumn + ` desc) as rowno
                    from stage.` + i_schema + `.` + i_table + `
                ) filter
                    on  filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
                    and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
                    and filter.rowno = 1
            order by tbl.` + i_keycolumn + ` asc
    `});
    
    snowflake.execute({sqltext: `
        insert into history.` + i_schema + `.` + i_table + `
            select distinct
                tbl.*
            from stage.` + i_schema + `.` + i_table + ` as tbl
                left join history.` + i_schema + `.` + i_table + ` as filter
                    on  filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
                    and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
            where filter.` + i_keycolumn + ` is null 
            order by tbl.` + i_keycolumn + ` asc
    `});

    return 'Table upserted from STAGE to RAW';
$$
;

Why do I suddenly use JavaScript here? Because I find string manipulation more intuitive to do in JavaScript and JS procedures are generally more performant in Snowflake than Python (less overhead). But I guess this could very well be done with a SQL procedure, too.

Now that we can load the data of one object, how about loading multiple/all objects?

Load all the data

This last procedure combines everything we built: Getting a token, checking for schema changes, creating tables and loading data. It also adds one more feature: If a schema changed, you presumably don’t want to lose all the data in history.salesforce. Hence, this procedure, before recreating all tables, will change the previous history table name and insert previous data after the tables are recreated.

create or replace procedure meta.load.p_salesforce_load_object_data_all (
    I_OBJECT_NAME string default null
    , I_FULL_LOAD boolean default false
    , I_TOKEN string default null
)
returns varchar
language sql
as
$$
declare
    objects cursor for
        select
            ob.name
            , ob.date_field 
        from meta.load.t_salesforce_objects ob
            left join raw.information_schema.tables ta
                on  ta.table_catalog = 'RAW'
                and ta.table_schema = 'SALESFORCE'
                and ta.table_name = upper(ob.name)
        where ob.sync = true
            and ob.api = 'bulk'
        order by ta.bytes desc;
        
    token string;
    object_name string;
    object_date_field string;
    schema_change_string string;
    schema_change_json variant;
    schema_changed boolean;
    common_columns string;
    sql_text string;
begin
    -- Get token
    if (:I_TOKEN is null) then
        call raw.salesforce.p_get_token() into token;
    else
        select :I_TOKEN into token;
    end if;

    if (:I_OBJECT_NAME is null) then
        -- Open cursor
        open objects;
    
        -- Loop through each object
        for record in objects do
            object_name := record.name;
            object_date_field := record.date_field;

            -- Check for changed schema
            call meta.load.p_salesforce_check_for_schema_change(:object_name, :token) into schema_change_string;
            schema_change_json := parse_json(schema_change_string);
            schema_changed := schema_change_json:SCHEMA_CHANGED::boolean;
            if (schema_changed) then
                -- Rename HISTORY table
                sql_text := 'alter table if exists history.salesforce.' || object_name || ' rename to history.salesforce.' || object_name || '_swap';
                execute immediate sql_text;
    
                -- Create tables
                call meta.load.p_salesforce_createtables(:object_name, :token);
    
                -- Swap data: get common columns
                select listagg(new.column_name, ', ') into common_columns
                    from history.information_schema.columns new
                        join history.information_schema.columns swap
                            on  swap.table_catalog = new.table_catalog
                            and swap.table_schema = new.table_schema
                            and swap.table_name = upper(concat(:object_name, '_swap'))
                            and swap.column_name = new.column_name
                    where   new.table_catalog = 'HISTORY'
                        and new.table_schema = 'SALESFORCE'
                        and new.table_name = upper(:object_name);
                        
                -- Swap data: insert
                sql_text := 'insert into history.salesforce.' || object_name || ' (' || common_columns || ')
                    select distinct ' || common_columns || '
                    from history.salesforce.' || object_name || '_swap';
                execute immediate sql_text;
                
                -- Swap data: drop swap table
                sql_text := 'drop table if exists history.salesforce.' || object_name || '_swap';
                execute immediate sql_text;
            end if;
    
            -- Load data
            async(call raw.salesforce.p_load_object_data(:object_name, :object_date_field, :I_FULL_LOAD, :token));
        end for;
    
        -- Close cursor
        close objects;

        -- Await all
        await all;
    
        return 'Data for all objects updated';
    else 
        -- Check for changed schema
        call meta.load.p_salesforce_check_for_schema_change(:I_OBJECT_NAME, :token) into schema_change_string;
        schema_change_json := parse_json(schema_change_string);
        schema_changed := schema_change_json:SCHEMA_CHANGED::boolean;
        if (schema_changed) then
            -- Rename HISTORY table
            sql_text := 'alter table if exists history.salesforce.' || I_OBJECT_NAME || ' rename to history.salesforce.' || I_OBJECT_NAME || '_swap';
            execute immediate sql_text;

            -- Create tables
            call meta.load.p_salesforce_createtables(:I_OBJECT_NAME, :token);

            -- Swap data: get common columns
            select listagg(new.column_name, ', ') into common_columns
                from history.information_schema.columns new
                    join history.information_schema.columns swap
                        on  swap.table_catalog = new.table_catalog
                        and swap.table_schema = new.table_schema
                        and swap.table_name = upper(concat(:I_OBJECT_NAME, '_swap'))
                        and swap.column_name = new.column_name
                where   new.table_catalog = 'HISTORY'
                    and new.table_schema = 'SALESFORCE'
                    and new.table_name = upper(:I_OBJECT_NAME);
                    
            -- Swap data: insert
            sql_text := 'insert into history.salesforce.' || I_OBJECT_NAME || ' (' || common_columns || ')
                select distinct ' || common_columns || '
                from history.salesforce.' || I_OBJECT_NAME || '_swap';
            execute immediate sql_text;
            
            -- Swap data: drop swap table
            sql_text := 'drop table if exists history.salesforce.' || I_OBJECT_NAME || '_swap';
            execute immediate sql_text;
        end if;

        -- Identify date field
        select
            ob.date_field into object_date_field
        from meta.load.t_salesforce_objects ob
            left join raw.information_schema.tables ta
                on  ta.table_catalog = 'RAW'
                and ta.table_schema = 'SALESFORCE'
                and ta.table_name = upper(ob.name)
        where ob.sync = true
            and ob.api = 'bulk'
            and upper(ob.name) = upper(:I_OBJECT_NAME);

        -- Load data
        call raw.salesforce.p_load_object_data(:I_OBJECT_NAME, :object_date_field, :I_FULL_LOAD, :token);

        return 'Data for object ' || I_OBJECT_NAME || ' updated';
    end if;
exception
    when statement_error then
        error_message := 'Error Code: ' || sqlcode || '\nError Message: ' || sqlerrm;
        return :error_message;
end;
$$
;

Calling this procedure frequently will keep your Salesforce data up to date in Snowflake. You can either update one object or all at once:

create or replace task raw.salesforce.ta_load_delta_weekday
user_task_managed_initial_warehouse_size = 'XSMALL'
schedule = 'USING CRON 20 0 * * 1-5 Europe/Zurich'
allow_overlapping_execution = false
user_task_timeout_ms = 7200000 -- 2 hours
suspend_task_after_num_failures = 10
task_auto_retry_attempts = 0
target_completion_interval = '2 HOURS'
serverless_task_max_statement_size = 'MEDIUM'
comment = 'Update Salesforce data every weekday at 00:20'
as
call meta.load.p_salesforce_load_object_data_all(null, false);
;

alter task if exists raw.salesforce.ta_load_delta_weekday resume
;

By setting a target_completion_interval here you can ensure the refresh is done at a specific time, even using a serverless task without child-task relations. In my case I know I (or rather other pipelines 😜) can start working with the fresh data at 02:30.