Salesforce to Snowflake without ELT-Tool
Let's cut out the middle-tool đ
TL/DR: It is possible to sync (duplicate) data from Salesforce CRM into Snowflake without any tool in the middle.
The use case
Traditionally to duplicate1 data from one database (Salesforce CRM) to another database (Snowflake) one would use some kind of tool to do the job - either in batches or streamed: Airbyte, Fivetran, Portable, Azure Data Factory, ⌠the options are virtually endless.
However, if you donât necessarily have to rely on too many of such a toolâs features - i.e. automatic (delayed) retries, collision management, error handling, logging or lineage graphs - or are fine with implementing those yourself where needed, you can just plug in to the same Salesforce API such tools use and load the data directly into Snowflake without any of those tools.
How?
Well: Snowpark of course!
Why would you to this?
Removing (dependency of) tools if they donât add significant value is usually a good idea anyway. In this case, you also remove a lot of overhead those tools usually require for all their features. When I moved from Azure Data Factory to Snowpark for loading Salesforce data, I reduced the runtime of the daily delta-load pipeline to 20% of its previous value. And runtime (at least for ADF) means cost đ
In this post Iâll provide some Snowflake procedures you can use as a starting point. They really only sync the data from one Salesforce org (if you need data from your staging org, youâll particularly have to modify the token-procedure), but when you understand what they do, you can surely extend on their functionality and improve them (e.g. there are many aspects you could parameterize I hard-coded or you manage to utilize the Salesforce bulk API v2). If you do, please let me know (and copy your improvements đ)! But firstâŚ
Some necessary setup
In my Snowflake environment, I have a catalog for raw data called raw, containing a schema for all Salesforce data called salesforce. Add to that a landing zone called stage.salesforce and an archive of previously observed rows called history.salesforce. The procedures populating these schemata live in a separate catalog called meta.load and some integration stuff in meta.integration:
create database if not exists raw
comment = 'Raw data'
;
create schema if not exists raw.salesforce
comment = 'Data from Salesforce'
;
create transient database if not exists stage -- no fail-safe required here
comment = 'Staged (temporarily) raw data'
;
create schema if not exists stage.salesforce
comment = 'Data from Salesforce'
;
create database if not exists history
comment = 'History of raw data'
;
create schema if not exists history.salesforce
comment = 'Data from Salesforce'
;
create database if not exists meta
comment = 'Meta information'
;
create schema if not exists meta.load
comment = 'Control loading data'
;
create schema if not exists meta.integration
comment = 'Parking lot for integration stuff (e.g. API integrations and secrets)'
;The history catalog was previously referenced here:

Also, to connect to our Salesforce instance, weâll need to safely store a Salesforce (service-) userâs credentials and an external access integration:
create or replace network rule meta.integration.nr_salesforce
mode = 'EGRESS'
type = 'HOST_PORT'
value_list = ('*.sandbox.my.salesforce.com:443', '*.my.salesforce.com:443', '*.salesforce.com:443')
comment = 'Network Rule to access .salesforce.com'
;
create or replace secret meta.integration.se_salesforce
type = generic_string
secret_string = '{
"client_id": "<your Salesforce client id here>",
"client_secret": "<your Salesforce client secret here>",
"username": "<your Salesforce username here>",
"password": "<your Salesforce password here>",
"security_token": "<your Salesforce security token here>"
}'
comment = 'Salesforce Access'
;This secret is just a string, as 5 elements of a secret canât (currently) be stored in single instances of other formats. Since this secret will be used in (Python) procedures, it will be easy enough to parse it later to get the required elements.
use role accountadmin
;
create or replace external access integration i_salesforce
allowed_network_rules = (meta.integration.nr_salesforce)
allowed_authentication_secrets = (meta.integration.se_salesforce)
enabled = true
comment = 'Integration to access .salesforce.com'
;
grant usage on network rule meta.integration.nr_salesforce to role sysadmin;
grant usage on integration i_salesforce to role sysadmin;To control which Salesforce objects to sync, another table in meta.load will be used:
create or replace table meta.load.t_salesforce_objects
(
name varchar
, label varchar
, sync boolean
, date_field varchar(50)
, api varchar(10)
)
cluster by (sync)
comment = 'Salesforce Objects to load';
insert into meta.load.t_salesforce_objects
values
('Account', 'Account', true, 'SystemModStamp', 'query')
, ('Contact' , 'Contact', true, 'SystemModStamp', 'query')
, ('ContactHistory', 'Contact History', true, 'CreatedDate', 'query')
;Here, to get you started, I just added 3 objects to the sync. Add whichever objects you want to sync here.
Note the different date_field for the ContactHistory object: Change logs (objects with the âHistoryâ suffix) in Salesforce donât have a SystemModStamp. There are also a couple of other objects where LastModifiedDate is the best available option (e.g. ContentNote).
Next, we need a table to map Salesforce data types to Snowflake data types:
create or replace table meta.load.t_salesforce_types
(
salesforce_type varchar
, snowflake_type varchar
, use_length boolean
, use_precisionandscale boolean
)
cluster by (salesforce_type)
comment = 'Salesforce Field Types to lookup';
insert into meta.load.t_salesforce_types
values
('ADDRESS','n/a',false, false) -- available in object description REST API, but not query/bulk REST API for some reason
, ('ANYTYPE', 'VARCHAR', false, false)
, ('BASE64', 'n/a', false, false) -- binary could be exported if required
, ('BOOLEAN', 'BOOLEAN', false, false)
, ('COMBOBOX', 'VARCHAR', false, false)
, ('CURRENCY', 'NUMBER', false, true)
, ('DATACATEGORYGROUPREFERENCE', 'VARCHAR', false, false)
, ('DATE', 'DATE', false, false)
, ('DATETIME', 'TIMESTAMP_NTZ(9)', false, false)
, ('DOUBLE', 'NUMBER', false, true)
, ('EMAIL', 'VARCHAR', true, false)
, ('ENCRYPTEDSTRING', 'VARCHAR', true, false)
, ('ID', 'VARCHAR', true, false)
, ('INT', 'NUMBER', false, false)
, ('LOCATION', 'n/a', false, false) -- available in object description REST API, but not query/bulk REST API for some reason
, ('MULTIPICKLIST', 'VARCHAR', false, false)
, ('PERCENT', 'NUMBER', false, true)
, ('PHONE', 'VARCHAR', true, false)
, ('PICKLIST', 'VARCHAR', true, false)
, ('REFERENCE', 'VARCHAR', true, false)
, ('STRING', 'VARCHAR', true, false)
, ('TEXTAREA', 'VARCHAR', true, false)
, ('TIME', 'TIMESTAMP_NTZ(9)', false, false)
, ('URL', 'VARCHAR', true, false)
;And weâll create a table to store a tableâs columns to make creating tables easier later:
create or replace table meta.load.t_salesforce_fields
(
object varchar not null
, name varchar not null
, label varchar
, type varchar
, length number
, scale number
, precision number
, custom boolean
, externalid boolean
, ordinal_position number
, created timestamp_ntz(9) default current_timestamp()
)
comment = 'Salesforce Fields of Objects'
;Finally, to exclude fields from loading for whatever reason (i.e. in case you donât want to load sensitive data or just generally reduce the number of fields to sync to improve performance), we can manually store those in a table, too:
create or replace table meta.load.t_salesforce_fields_exclude
(
object varchar not null
, name varchar not null
, comment varchar
, created timestamp_ntz(9) default current_timestamp()
)
comment = 'Salesforce Fields of Objects to be ignored'
;Get a token
To read data from Salesforce, weâll first generate and store an access token.
This token-storing approach was also previously referenced:
Refresh Tableau from inside SnowflakeHow to create a Snowflake procedure to refresh Tableau data sources as soon as the referenced data is ready for export
create or replace table meta.load.t_token
(
system varchar primary key rely comment 'The system or API this token can be used with'
, token varchar comment 'The token to authenticate with'
, inserted timestamp_tz(9) default current_timestamp() comment 'When was this token inserted into this table?'
, valid_from timestamp_tz(9) default current_timestamp() comment 'Since when can this token be used?'
, valid_to timestamp_tz(9) comment 'Until when can this token be used before it needs to be refreshed?'
)
comment = 'Table to temporarily save API auth tokens'
;In this table a token can temporarily be stored:
create or replace procedure meta.load.p_get_token_salesforce()
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests')
handler = 'main'
comment = 'Refresh Salesforce API token'
external_access_integrations = (i_salesforce)
secrets = ('sf' = meta.integration.se_salesforce)
as
$$
import _snowflake
import snowflake.snowpark as snowpark
import json
import requests
from datetime import datetime, timedelta
def get_new_token():
secret_string = _snowflake.get_generic_secret_string('sf')
secret_dict = json.loads(secret_string)
# Signin endpoint URL
url = f'https://login.salesforce.com/services/oauth2/token?grant_type=password&client_id=' + secret_dict['client_id'] + '&client_secret=' + secret_dict['client_secret'] + '&username=' + secret_dict['username'] + '&password=' + secret_dict['password'] + secret_dict['security_token']
# Define headers
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# Prepare an empty request payload
payload = {}
# Make the POST request to get the token
response = requests.post(url, headers=headers, json=payload)
# Check for successful response
if response.status_code == 200:
token_json = response.json()
access_token = token_json['access_token']
return access_token
else:
raise Exception(f"Failed to obtain token: {response.status_code} - {response.text}")
def main(session: snowpark.Session):
# Select from META.LOAD.T_TOKEN
df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'salesforce' LIMIT 1").collect()
# Check if there's a row
if len(df) == 0 or df[0]['VALID_TO'] < session.sql("SELECT CURRENT_TIMESTAMP() AS CURRENT_TIMESTAMP").collect()[0]['CURRENT_TIMESTAMP']:
# Retrieve a new token
new_token = get_new_token()
# Delete previous token from table
query_delete = f"""
DELETE FROM META.LOAD.T_TOKEN
WHERE SYSTEM = 'salesforce'
"""
session.sql(query_delete).collect()
# Insert new token
valid_to_value = datetime.now() + timedelta(hours=2)
query_insert = f"""
INSERT INTO META.LOAD.T_TOKEN (SYSTEM, TOKEN, VALID_TO)
VALUES ('salesforce', '{new_token}', '{valid_to_value.strftime('%Y-%m-%d %H:%M:%S')}')
"""
session.sql(query_insert).collect()
# Select again
df = session.sql("SELECT * FROM META.LOAD.T_TOKEN WHERE SYSTEM = 'salesforce' LIMIT 1").collect()
# Return the TOKEN value
if len(df) > 0:
return df[0]['TOKEN']
else:
return "No token found"
$$
;With this token we can then do 3 things:
- Check for a changed schema by comparing an objectâs description in Salesforce with its corresponding table in Snowflake
- Create/recreate a table in Snowflake according to an objectâs description in Salesforce
- Load data from Salesforce into a table in Snowflake
Check for schema change
In case a schema changed in Salesforce (or the table doesnât exist in Snowflake, yet), the corresponding tableâs schema in Snowflake should be adjusted accordingly:
create or replace procedure meta.load.p_salesforce_check_for_schema_change(
I_OBJECT_NAME string
, I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Check if the schema in Snowflake RAW changed for the provided Salesforce object'
external_access_integrations = (i_salesforce)
as
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd
import json
def main(session: snowpark.Session, I_OBJECT_NAME: str, I_TOKEN: str):
# Get a token
if I_TOKEN is None:
token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN_SALESFORCE"]
else:
token = I_TOKEN
# Define URL to query Salesforce
url = f'https://<your Salesforce org name here>.my.salesforce.com/services/data/v63.0/sobjects/{I_OBJECT_NAME}/describe/'
# Define headers
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer ' + token
}
# Get object description from Salesforce
response = requests.get(url, headers=headers)
object_description = response.json()
# Convert to Pandas DataFrame
pandas_df = pd.DataFrame(object_description['fields'])
pandas_df = pandas_df[~pandas_df['deprecatedAndHidden']]
pandas_df = pandas_df[['name', 'type', 'length', 'precision', 'scale']]
# Convert Pandas DataFrame to JSON
filtered_json = pandas_df.to_dict(orient='records')
# Stringify the JSON
stringified_json = json.dumps(filtered_json)
# Run the diffcheck
# diffcheck_result = session.sql(f"select * from table(meta.load.f_salesforce_check_for_schema_change('{I_OBJECT_NAME}', '{stringified_json}'))").collect()
diffcheck_result = session.sql(f"""with cte_parse as (
select parse_json('{stringified_json}') as fields
)
, cte_flat as (
select f.value as fields
from cte_parse p
, lateral flatten(input => p.fields) f
)
, cte_salesforce as (
select
upper(fl.fields:name::varchar) as name
, upper(fl.fields:type::varchar) as type
, fl.fields:length::number(38,0) as length
, fl.fields:precision::number(38,0) as precision
, fl.fields:scale::number(38,0) as scale
from cte_flat fl
left join meta.load.t_salesforce_fields_exclude ex
on upper(ex.object) = upper('{I_OBJECT_NAME}')
and upper(ex.name) = upper(fl.fields:name::varchar)
where upper(fl.fields:type::varchar) not in ('ADDRESS', 'LOCATION', 'BASE64') -- exclude types known to not be importable via CSV staging
and ex.name is null -- exclude fields known to not be importable for some reason
)
, cte_snowflake as (
select
column_name as name
-- , data_type as type
, character_maximum_length as length
, numeric_precision as precision
, numeric_scale as scale
-- , ordinal_position as position
from raw.information_schema.columns
where table_catalog = 'RAW'
and table_schema = 'SALESFORCE'
and table_name = upper('{I_OBJECT_NAME}')
)
select
count(*) as diff
, listagg(sn.name, '|') as fields_not_found_in_salesforce -- these fields are not present or different in Salesforce
, listagg(sa.name, '|') as fields_not_found_in_snowflake -- these fields are not present or different in Snowflake
, case
when count(*) > 0
then true
else false
end as schema_changed
from cte_salesforce sa
full outer join cte_snowflake sn
on sn.name = sa.name
and case
when sn.length is not null -- string/text/varchar field in Snowflake
then sn.length >= sa.length
when sa.type = 'INT' -- integers are stored as number(38,0) in Snowflake
then (sn.precision = 38 and sn.scale = 0)
else (ifnull(sn.precision, 0) >= sa.precision and ifnull(sn.scale, 0) >= sa.scale)
end
where sa.name is null or sn.name is null""").collect()
# Convert the result to a dictionary
diffeck_dict = diffcheck_result[0].as_dict()
# Convert the dictionary to a stringified JSON
diffcheck_json = json.dumps(diffeck_dict)
# Return
return str(diffcheck_json)
$$
;This procedure will compare the arrays of columns in Salesforce and Snowflake and return a stringified JSON containing the information if they are different. If so, the next procedure will (re-) create an objectâs table.
Why do I use pandas here instead of Snowpark data frames? Because converting an array (or in Python: list) of fields to a Pandas data frame and vice versa is just so easy.
Create table
In this step weâll create tables not yet present or with a changed schema in raw.salesforce, stage.salesforce and history.salesforce:
create or replace procedure meta.load.p_salesforce_createtables(
I_OBJECT_NAME string
, I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Create Tables based on collected field information'
external_access_integrations = (i_salesforce)
as
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd
from datetime import datetime
def main(session: snowpark.Session, I_OBJECT_NAME: str, I_TOKEN: str):
# Get a token
if I_TOKEN is None:
token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN_SALESFORCE"]
else:
token = I_TOKEN
# Define URL to query Salesforce
url = f'https://<your Salesforce org name here>.my.salesforce.com/services/data/v63.0/sobjects/{I_OBJECT_NAME}/describe/'
# Define headers
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer ' + token
}
# Get object description from Salesforce
response = requests.get(url, headers=headers)
object_description = response.json()
# Convert to Pandas DataFrame
pandas_df = pd.DataFrame(object_description['fields'])
pandas_df = pandas_df[~pandas_df['deprecatedAndHidden']]
pandas_df = pandas_df[['name', 'label', 'type', 'length', 'precision', 'scale', 'custom', 'externalId']]
# Remove unused data type
pandas_df = pandas_df[~pandas_df['type'].isin(['address', 'location', 'base64'])]
# Remove excluded field names
fields_to_be_excluded_tbl = session.sql(f"SELECT NAME FROM META.LOAD.T_SALESFORCE_FIELDS_EXCLUDE WHERE UPPER(OBJECT) = UPPER('{I_OBJECT_NAME}')").collect()
fields_to_be_excluded_arr = [row['NAME'] for row in fields_to_be_excluded_tbl]
fields_to_be_excluded_arr_valid = [col for col in fields_to_be_excluded_arr if col in pandas_df.columns]
pandas_df = pandas_df.drop(columns=fields_to_be_excluded_arr_valid)
# Add an incremental number column
pandas_df['ordinal_position'] = range(1, len(pandas_df) + 1)
# Add the current object name
pandas_df['object'] = f'{I_OBJECT_NAME}'
# Add current timstamp
pandas_df['created'] = pd.to_datetime(datetime.now())
pandas_df['created'] = pandas_df['created'].dt.tz_localize('Europe/Zurich')
# Convert DataFrame column names to uppercase
pandas_df.columns = pandas_df.columns.str.upper()
# Get the table schema
table_schema = session.sql("describe table meta.load.t_salesforce_fields").collect()
# Extract column names from the schema
table_columns = [row['name'] for row in table_schema]
# Reorder the DataFrame columns to match the table schema
pandas_df = pandas_df[table_columns]
# Delete rows from meta.load.t_salesforce_fields
session.sql(f"delete from meta.load.t_salesforce_fields where upper(object) = upper('{I_OBJECT_NAME}')").collect()
# Insert Pandas DataFrame into meta.load.t_salesforce_fields
snowpark_df = session.write_pandas(
df=pandas_df,
database="META",
schema="LOAD",
table_name="T_SALESFORCE_FIELDS",
use_logical_type=True,
auto_create_table=False
)
# Get fields of tables to be created
concatenated_fields = session.sql(f"""with cte_fields as (
select
fi.name
, case
when ty.use_length and fi.length > 0 then concat(ty.snowflake_type, '(', fi.length, ')')
when ty.use_precisionandscale then concat(ty.snowflake_type, '(', fi.precision, ',', fi.scale, ')')
else ty.snowflake_type end as type
, fi.ordinal_position
from meta.load.t_salesforce_fields fi
left join meta.load.t_salesforce_types ty
on ty.salesforce_type = upper(fi.type)
left join meta.load.t_salesforce_fields_exclude fe
on fe.object = fi.object
and fe.name = fi.name
where fi.object = '{I_OBJECT_NAME}'
and fe.name is null
)
select listagg (concat(name, ' ', type), ', ') within group (order by ordinal_position asc) as fields
from cte_fields""").collect()[0]['FIELDS']
# Create table in stage
session.sql(f"""create or replace table stage.salesforce.{I_OBJECT_NAME}
( {concatenated_fields} )
comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()
# Create table in raw
session.sql(f"""create or replace table raw.salesforce.{I_OBJECT_NAME}
( {concatenated_fields} )
change_tracking = true
comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()
# Create table in hisotry
session.sql(f"""create or replace table history.salesforce.{I_OBJECT_NAME}
( {concatenated_fields} )
comment = 'Salesforce Object: {I_OBJECT_NAME} - created by META.LOAD.P_SALESFORCE_CREATETABLES on {datetime.now()}'""").collect()
# Return
return 'Tables created'
$$
;Again, why Pandas? Because the columnar manipulations of the data frame are more flexible than with a Snowpark data frame.
Note: In case you wanted to have your Salesforce data available in Iceberg tables, you could simply create the tables in raw.salesforce as Iceberg tables here đ
Load one objectâs data
Now itâs finally time to move some data! This procedure utilizes the queryAll endpoint of Salesforce to also include deleted records. If you donât flag deleted records as deleted in Snowflake, you couldnât really do delta-loads. However, if you donât plan to do delta-loads anyway, you could simply use the query endpoint instead and ignore deleted records altogether.
Note: the object CampaignMember doesnât use the Salesforce recycle bin for some reason, hence deleted records from this object will just disappear. At least for this object youâll have to do a full load every now and then to remove deleted records from Snowflake, too. The frequency depends on what you plan to do with CampaignMembers, I do this once per week.
create or replace procedure meta.load.p_salesforce_load_object_data(
I_OBJECT_NAME string
, I_DATE_FIELD string default 'SystemModStamp'
, I_FULL_LOAD boolean default false
, I_TOKEN string default null
)
returns string
language python
runtime_version = '3.11'
packages = ('snowflake-snowpark-python', 'requests', 'pandas')
handler = 'main'
comment = 'Load object data from Salesforce to Snowflake'
external_access_integrations = (i_salesforce)
as
$$
import _snowflake
import snowflake.snowpark as snowpark
import requests
import pandas as pd
def main(session: snowpark.Session, I_OBJECT_NAME: str, I_DATE_FIELD: str, I_FULL_LOAD: bool, I_TOKEN: str):
# Get a token
if I_TOKEN is None:
token = session.sql("CALL META.LOAD.P_GET_TOKEN_SALESFORCE()").collect()[0]["P_GET_TOKEN"]
else:
token = I_TOKEN
# Get fields
fields_result = session.sql(f"""select
listagg(column_name, ',') within group (order by ordinal_position asc) as fields
from raw.information_schema.columns
where table_catalog = 'RAW'
and table_schema = 'SALESFORCE'
and table_name = upper('{I_OBJECT_NAME}')""").collect()
if not fields_result or fields_result[0]["FIELDS"] is None:
# Table does not exist
return f'No fields found for object {I_OBJECT_NAME}'
else:
fields = fields_result[0]["FIELDS"]
# Truncate stage
session.sql(f"truncate table stage.salesforce.{I_OBJECT_NAME}").collect()
# Get the table schema
table_schema = session.sql(f"describe table stage.salesforce.{I_OBJECT_NAME}").collect()
# Extract column names from the schema
table_columns = [row['name'] for row in table_schema]
# Set patterns to later identify datetime columns
pattern_datetime= r'\d{4}-\d{2}-\d{2}.*\d{2}:\d{2}:\d{2}'
pattern_date = r'\d{4}-\d{2}-\d{2}'
# Set headers for all requests
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {token}'
}
# Set base URL
base_url = 'https://<your Salesforce org name here>.my.salesforce.com'
# Set query for initial request
if I_FULL_LOAD:
query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}'
else:
watermark_result = session.sql(f"""select to_varchar(convert_timezone('UTC', max({I_DATE_FIELD})), 'YYYY-MM-DDTHH24:MI:SSZ') as watermark
from raw.salesforce.{I_OBJECT_NAME}""").collect()
if not watermark_result or watermark_result[0]["WATERMARK"] is None:
query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}'
else:
watermark = watermark_result[0]["WATERMARK"]
query = f'SELECT+{fields}+FROM+{I_OBJECT_NAME}+WHERE+{I_DATE_FIELD}+>+{watermark}+OR+IsDeleted+=+True'
# Set URL for initial request
url = f'{base_url}/services/data/v63.0/queryAll/?q={query}'
# Initialize session with Salesforce API
salesforce_session = requests.Session()
# Fetch all records
while True:
response = salesforce_session.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
response_json = response.json()
records = response_json['records']
if len(records) > 0:
# Convert to Pandas DataFrame
# Why Pandas and not Snowpark? => We're not doing any row-processing, just on columns - Pandas is faster in this case
pandas_df = pd.DataFrame(records)
# Convert DataFrame column names to uppercase
pandas_df.columns = pandas_df.columns.str.upper()
# Reorder the DataFrame columns to match the table schema
pandas_df = pandas_df[table_columns]
# Identify datetime columns using regex
for col in pandas_df.columns:
# Check if the column contains datetime values
if pandas_df[col].astype(str).str.contains(pattern_datetime).any():
pandas_df[col] = pd.to_datetime(pandas_df[col], errors='coerce')
# Check if the column contains date values
elif pandas_df[col].astype(str).str.match(pattern_date).any():
pandas_df[col] = pd.to_datetime(pandas_df[col], errors='coerce')
if pd.api.types.is_datetime64_any_dtype(pandas_df[col]):
pandas_df[col] = pandas_df[col].dt.date
else:
pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else x)
# Check if a column has mixed types and convert to strings
elif pandas_df[col].apply(type).nunique() > 1:
pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else str(x))
# If it doesn't match either pattern, leave it as is
else:
pandas_df[col] = pandas_df[col].apply(lambda x: pd.NA if x is None else x)
# Insert Pandas DataFrame to STAGE
snowpark_df = session.write_pandas(
df=pandas_df,
database="STAGE",
schema="SALESFORCE",
table_name=f"{I_OBJECT_NAME.upper()}",
use_logical_type=True,
auto_create_table=False
)
# Check if there are more records
if response_json['done']:
break
# Update URL to the next page
next_url = response_json['nextRecordsUrl']
url = f'{base_url}{next_url}'
else:
break
# Upsert from STAGE to RAW
if I_FULL_LOAD:
session.sql(f"call stage.public.p_replace('SALESFORCE', '{I_OBJECT_NAME}', 'ID', '{I_DATE_FIELD}')").collect();
else:
session.sql(f"call stage.public.p_upsert('SALESFORCE', '{I_OBJECT_NAME}', 'ID', '{I_DATE_FIELD}')").collect();
# Return
return f'Data for object {I_OBJECT_NAME} updated'
$$
;And once again, why Pandas? Pandas is faster in converting the Salesforce APIâs JSON response to a data frame than Snowpark data frames would be. And we mostly do columnar manipulation of the data frame again. Hence, Pandas is just fine in this case.
Architectural note 1/2: This procedure is not supposed to be called directly but rather by a different procedure below. Hence it doesnât check for schema changes itself.
Architectural note 2/2: Why would I parse and materialize each page returned by Salesforce here - isnât this a lot of overhead? Indeed, it would be more performant to first load all pages into a large array/list and in the end parse and insert everything into the stage.salesforce table at once. However, depending on the size of the object (particularly on a full load) and the degree of parallelism, your warehouse might run out of memory. If you only parse and insert 1 page (with up to 2000 rows) at once, this takes a bit longer but runs fine on an XS warehouse. If you want to improve, balance this trade-off differently by chunking a couple more rows (say, 100k at once) before parsing and inserting.
Behavioral note: When doing a delta-load (filtering the watermark date), also include a filter for all deleted records. For some reason filtering deleted records only by SystemModStamp doesnât necessarily return all deleted records even if their SystemModStamp would be inside the filter đ¤ˇ
And weâll need two more helper procedures to upsert the data from stage.salesforce to raw.salesforce and history.salesforce. I use those two helpers for many other data sources, too, so I put them into stage.public:
create or replace procedure stage.public.p_replace (
i_schema varchar
, i_table varchar
, i_keycolumn varchar
, i_distinctifycolumn varchar
)
returns string
language javascript
comment = 'Truncate and replace Table from STAGE to RAW'
as
$$
snowflake.execute({sqlText: `
truncate table raw.` + i_schema + `.` + i_table + `
`});
snowflake.execute({sqlText: `
insert into raw.` + i_schema + `.` + i_table + `
select distinct
tbl.*
from stage.` + i_schema + `.` + i_table + ` as tbl
join (
select
` + i_keycolumn + `
, ` + i_distinctifycolumn + `
, row_number() over (partition by ` + i_keycolumn + ` order by ` + i_distinctifycolumn + ` desc) as rowno
from stage.` + i_schema + `.` + i_table + `
) filter
on filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
and filter.rowno = 1
order by tbl.` + i_keycolumn + ` asc
`});
snowflake.execute({sqltext: `
insert into history.` + i_schema + `.` + i_table + `
select distinct
tbl.*
from stage.` + i_schema + `.` + i_table + ` as tbl
left join history.` + i_schema + `.` + i_table + ` as filter
on filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
where filter.` + i_keycolumn + ` is null
order by tbl.` + i_keycolumn + ` asc
`});
return 'Table replaced from STAGE to RAW';
$$
;
create or replace procedure stage.public.p_upsert (
i_schema varchar
, i_table varchar
, i_keycolumn varchar
, i_distinctifycolumn varchar
)
returns string
language javascript
comment = 'Upsert Table from STAGE to RAW'
as
$$
snowflake.execute({sqltext: `
delete from raw.` + i_schema + `.` + i_table + ` rawtbl
using (
select distinct ` + i_keycolumn + `
from stage.` + i_schema + `.` + i_table + `
) statbl
where rawtbl.` + i_keycolumn + ` = statbl.` + i_keycolumn + `
`});
snowflake.execute({sqltext: `
insert into raw.` + i_schema + `.` + i_table + `
select distinct
tbl.*
from stage.` + i_schema + `.` + i_table + ` as tbl
join (
select
` + i_keycolumn + `
, ` + i_distinctifycolumn + `
, row_number() over (partition by ` + i_keycolumn + ` order by ` + i_distinctifycolumn + ` desc) as rowno
from stage.` + i_schema + `.` + i_table + `
) filter
on filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
and filter.rowno = 1
order by tbl.` + i_keycolumn + ` asc
`});
snowflake.execute({sqltext: `
insert into history.` + i_schema + `.` + i_table + `
select distinct
tbl.*
from stage.` + i_schema + `.` + i_table + ` as tbl
left join history.` + i_schema + `.` + i_table + ` as filter
on filter.` + i_keycolumn + ` = tbl.` + i_keycolumn + `
and equal_null(filter.` + i_distinctifycolumn + `, tbl.` + i_distinctifycolumn + `)
where filter.` + i_keycolumn + ` is null
order by tbl.` + i_keycolumn + ` asc
`});
return 'Table upserted from STAGE to RAW';
$$
;Why do I suddenly use JavaScript here? Because I find string manipulation more intuitive to do in JavaScript and JS procedures are generally more performant in Snowflake than Python (less overhead). But I guess this could very well be done with a SQL procedure, too.
Now that we can load the data of one object, how about loading multiple/all objects?
Load all the data

This last procedure combines everything we built: Getting a token, checking for schema changes, creating tables and loading data. It also adds one more feature: If a schema changed, you presumably donât want to lose all the data in history.salesforce. Hence, this procedure, before recreating all tables, will change the previous history table name and insert previous data after the tables are recreated.
create or replace procedure meta.load.p_salesforce_load_object_data_all (
I_OBJECT_NAME string default null
, I_FULL_LOAD boolean default false
, I_TOKEN string default null
)
returns varchar
language sql
as
$$
declare
objects cursor for
select
ob.name
, ob.date_field
from meta.load.t_salesforce_objects ob
left join raw.information_schema.tables ta
on ta.table_catalog = 'RAW'
and ta.table_schema = 'SALESFORCE'
and ta.table_name = upper(ob.name)
where ob.sync = true
and ob.api = 'bulk'
order by ta.bytes desc;
token string;
object_name string;
object_date_field string;
schema_change_string string;
schema_change_json variant;
schema_changed boolean;
common_columns string;
sql_text string;
begin
-- Get token
if (:I_TOKEN is null) then
call raw.salesforce.p_get_token() into token;
else
select :I_TOKEN into token;
end if;
if (:I_OBJECT_NAME is null) then
-- Open cursor
open objects;
-- Loop through each object
for record in objects do
object_name := record.name;
object_date_field := record.date_field;
-- Check for changed schema
call meta.load.p_salesforce_check_for_schema_change(:object_name, :token) into schema_change_string;
schema_change_json := parse_json(schema_change_string);
schema_changed := schema_change_json:SCHEMA_CHANGED::boolean;
if (schema_changed) then
-- Rename HISTORY table
sql_text := 'alter table if exists history.salesforce.' || object_name || ' rename to history.salesforce.' || object_name || '_swap';
execute immediate sql_text;
-- Create tables
call meta.load.p_salesforce_createtables(:object_name, :token);
-- Swap data: get common columns
select listagg(new.column_name, ', ') into common_columns
from history.information_schema.columns new
join history.information_schema.columns swap
on swap.table_catalog = new.table_catalog
and swap.table_schema = new.table_schema
and swap.table_name = upper(concat(:object_name, '_swap'))
and swap.column_name = new.column_name
where new.table_catalog = 'HISTORY'
and new.table_schema = 'SALESFORCE'
and new.table_name = upper(:object_name);
-- Swap data: insert
sql_text := 'insert into history.salesforce.' || object_name || ' (' || common_columns || ')
select distinct ' || common_columns || '
from history.salesforce.' || object_name || '_swap';
execute immediate sql_text;
-- Swap data: drop swap table
sql_text := 'drop table if exists history.salesforce.' || object_name || '_swap';
execute immediate sql_text;
end if;
-- Load data
async(call raw.salesforce.p_load_object_data(:object_name, :object_date_field, :I_FULL_LOAD, :token));
end for;
-- Close cursor
close objects;
-- Await all
await all;
return 'Data for all objects updated';
else
-- Check for changed schema
call meta.load.p_salesforce_check_for_schema_change(:I_OBJECT_NAME, :token) into schema_change_string;
schema_change_json := parse_json(schema_change_string);
schema_changed := schema_change_json:SCHEMA_CHANGED::boolean;
if (schema_changed) then
-- Rename HISTORY table
sql_text := 'alter table if exists history.salesforce.' || I_OBJECT_NAME || ' rename to history.salesforce.' || I_OBJECT_NAME || '_swap';
execute immediate sql_text;
-- Create tables
call meta.load.p_salesforce_createtables(:I_OBJECT_NAME, :token);
-- Swap data: get common columns
select listagg(new.column_name, ', ') into common_columns
from history.information_schema.columns new
join history.information_schema.columns swap
on swap.table_catalog = new.table_catalog
and swap.table_schema = new.table_schema
and swap.table_name = upper(concat(:I_OBJECT_NAME, '_swap'))
and swap.column_name = new.column_name
where new.table_catalog = 'HISTORY'
and new.table_schema = 'SALESFORCE'
and new.table_name = upper(:I_OBJECT_NAME);
-- Swap data: insert
sql_text := 'insert into history.salesforce.' || I_OBJECT_NAME || ' (' || common_columns || ')
select distinct ' || common_columns || '
from history.salesforce.' || I_OBJECT_NAME || '_swap';
execute immediate sql_text;
-- Swap data: drop swap table
sql_text := 'drop table if exists history.salesforce.' || I_OBJECT_NAME || '_swap';
execute immediate sql_text;
end if;
-- Identify date field
select
ob.date_field into object_date_field
from meta.load.t_salesforce_objects ob
left join raw.information_schema.tables ta
on ta.table_catalog = 'RAW'
and ta.table_schema = 'SALESFORCE'
and ta.table_name = upper(ob.name)
where ob.sync = true
and ob.api = 'bulk'
and upper(ob.name) = upper(:I_OBJECT_NAME);
-- Load data
call raw.salesforce.p_load_object_data(:I_OBJECT_NAME, :object_date_field, :I_FULL_LOAD, :token);
return 'Data for object ' || I_OBJECT_NAME || ' updated';
end if;
exception
when statement_error then
error_message := 'Error Code: ' || sqlcode || '\nError Message: ' || sqlerrm;
return :error_message;
end;
$$
;Calling this procedure frequently will keep your Salesforce data up to date in Snowflake. You can either update one object or all at once:
create or replace task raw.salesforce.ta_load_delta_weekday
user_task_managed_initial_warehouse_size = 'XSMALL'
schedule = 'USING CRON 20 0 * * 1-5 Europe/Zurich'
allow_overlapping_execution = false
user_task_timeout_ms = 7200000 -- 2 hours
suspend_task_after_num_failures = 10
task_auto_retry_attempts = 0
target_completion_interval = '2 HOURS'
serverless_task_max_statement_size = 'MEDIUM'
comment = 'Update Salesforce data every weekday at 00:20'
as
call meta.load.p_salesforce_load_object_data_all(null, false);
;
alter task if exists raw.salesforce.ta_load_delta_weekday resume
;By setting a target_completion_interval here you can ensure the refresh is done at a specific time, even using a serverless task without child-task relations. In my case I know I (or rather other pipelines đ) can start working with the fresh data at 02:30.
