Asynchronous Child Jobs in Snowflake

Wait, what? Child labor?

Asynchronous Child Jobs in Snowflake

TL/DR: Running stuff in parallel is faster than running the same stuff sequentially.


Last week Snowflake made asynchronous child jobs GA:

docs.snowflake.com/en/developer-guide/snowflake-scripting/asynchronous-child-jobs

Why would you care? Because you might have a few use cases where this little tweak comes in really handy. I know I had one - and it improved the runtime of my procedure from about 27 minutes down to about 3 minutes. Let me show you how:

The use case

Salesforce can track some field history by itself. It’s very useful, but it has its limitations (particularly the number of fields per object you can track is limited). You can pay for more fields, of course, but it’s unlikely you will get a full history of all fields of all objects this way.

Now, you could set up CDC on your Salesforce objects and capture the full history of changes this way. But if you are like me, you like batch loads: They are efficient and relatively cheap. However, you won’t be able to catch every single update of a record this way as you’ll only have the latest version of a record during the latest batch load.

This is the biggest obstacle of this approach - every record change since the previous batch load would only be attributed to the most recent timestamp. Not perfect by far, but in many cases good enough. And for critical fields, you can still use the Salesforce-native field history.

So let’s presume we batch-load Salesforce records into Snowflake for whatever reason. While importing them, you probably first stage the recently updated records somewhere before you then merge them into your persisting raw data table. A very cheap way to create a history of each record while doing this in a separate history table would be to …

insert into history.salesforce.[current_object_name]
select distinct
  tbl.*
from stage.salesforce.[current_object_name] as tbl
  left join history.salesforce.[current_object_name] as filter
    on filter.id = tbl.id
      and filter.systemmodstamp = tbl.systemmodstamp
where filter.id is null

… find all records with an updated timestamp column (Since not all Salesforce objects have a SystemModStamp you might use LastModifiedDate for some… Hence in reality this is done using a more dynamic process, but you get the idea 😉) and insert those with an update - any update in whatever field - into the history. It would be possible to scan for changed fields and their values here, immediately, but that would decrease the speed of batch load pipeline as we had to scan each column in each row of the stage data for changes in comparison to the previously loaded and persisted raw data.

I personally would rather keep the batch load lean and do the search for changed fields later.

Searching changed fields

After inserting records into the history table, visually scanning for changed values from one row to another, while certainly possible, is not very efficient. I would rather have a table only containing the changed fields in a log to immediately spot what changed when. Just like the vanilla Salesforce fields history does:

create or replace table history.salesforce_log.[current_object_name] (
  id varchar(18) comment 'Salesforce Record ID'
  , modified timestamp_ntz(9) comment 'When was this field value likely modified? Note: Due to batch loads, the change happened any time between MODIFIED and MODIFIED_LAG'
  , modified_lag timestamp_ntz(9) comment 'When was this record modified last before?'
  , modified_by varchar(18) comment 'Salesforce User ID who performed the last change'
  , field  string comment 'API name of the Salesforce field changed'
  , value_old string comment 'Old value, casted to string'
  , value_new string comment 'New value, casted to string'
  , inserted timestamp_ntz(9) default current_timestamp() comment 'When was this change log row inserted to the table?'
)
comment = 'Converted Record History to Change Log'
;

If we then use a procedure to loop through all columns of the history table and compare each row to its previous value, identifying changed values is straight forward:

create or replace procedure history.salesforce_log.p_convert_record_history_to_changelog (I_TABLE varchar)
returns varchar
language sql
AS
$$
declare
  columns cursor for select column_name from history.information_schema.columns where table_catalog = 'HISTORY' and table_schema = 'SALESFORCE' and table_name = upper(?);
  column_name varchar;
  sql_text varchar;
begin
  -- Truncate change log
  execute immediate 'truncate table history.salesforce_log.' || I_TABLE;

  -- Open cursor for this table
  open columns using (I_TABLE);

  -- Loop through each column
  for record in columns do
    column_name := record.column_name;
    -- Ignore system fields
    if (column_name != 'ID' 
      and column_name != 'LASTMODIFIEDBYID'
      and column_name != 'LASTSTAGECHANGEDATE' 
      and column_name != 'LASTMODIFIEDDATE'
      and column_name != 'SYSTEMMODSTAMP' 
      and column_name != 'LASTREFERENCEDDATE'
      and column_name != 'LASTVIEWEDDATE') then
      sql_text := '
        insert into history.salesforce_log.' || I_TABLE || '
        select
          id
          , systemmodstamp as modified
          , lag(systemmodstamp) over (partition by id order by systemmodstamp asc) as modified_lag
          , lastmodifiedbyid as modified_by
          , ''' || column_name || ''' as field
          , cast(lag(' || column_name || ') over (partition by id order by systemmodstamp asc) as string) as old_value
          , cast(' || column_name || ' as string) as new_value
          , current_timestamp()  as inserted
        from history.salesforce.' || I_TABLE || '
        qualify lag(systemmodstamp) over (partition by id order by systemmodstamp asc) is not null
          and equal_null(lag(' || column_name || ') over (partition by id order by systemmodstamp asc), ' || column_name || ') = false
      ';
      execute immediate sql_text;
    end if;
  end for;

  -- Close cursor
  close columns;

  return 'Converted Record History to Change Log';
end;
$$
;

Since this SQL procedure involves using a cursor, one might be tempted to use a JavaScript or Python procedure instead. But we want to run the critical part of this procedure asynchronously in the next step, so it’s definitely worth keeping it in SQL.

As async and await are currently only available in SQL

Side note: this procedure scans the whole history in every run as we don’t know automatically which rows were last inserted into the history and, hence, can’t just compare those with their previous versions. This is why the procedure initially needs about 27 minutes in my case (That is for a total of about 25kk rows over about 600-something columns across 3 tables). This could be improved by simply adding an insert timestamp to the history table and delta-processing only the most recently inserted versions of records, but there is another option for improvement:

async and await

Each insert of the for-loop in the procedure above can be considered a child-job of the procedure. If we don’t do them sequentially, but rather queue all of them and allow multiple inserts to be processed in parallel, this utilizes the Snowflake warehouse much more efficiently. And the change in the procedure is remarkably tiny:

create or replace procedure history.salesforce_log.p_convert_record_history_to_changelog (I_TABLE varchar)
returns varchar
language sql
AS
$$
declare
  [...]
begin
  [...]

  -- Loop through each column
  for record in columns do
    [...]
    if [...]
      sql_text := '
        [...]
      ';

      -- !!! here is the 1st of 2 changes:
      async (execute immediate sql_text);
      -- !!! that's it!

    end if;
  end for;

  -- Close cursor
  close columns;

  -- !!! and here is the 2nd of 2 changes:
  await all;
  -- !!! and done

  return 'Converted Record History to Change Log';
end;
$$
;

Really all we have to do is calling the inserts asynchronously and wait for them all to finish. This reduces - with everything else remaining equal - the runtime of the procedure to about 3 minutes - or about 12% of the original value.

Should I still improve the procedure to process new history records in delta-loads? Probably, but for the purpose of this article, I’ll call it a day and am very happy with the improvement requiring a total of 20 additional characters in my procedure.