Incremental loads in SSIS using the
Lookup component and FNV1a hash
in a synchronous script component
All code samples are available here:
There are several treatments of the incremental load pattern in SSIS, as well as some tools that handle this. This is also called a Delta load or an Upsert. TableDifference from SQLBI.Eu, now owned by CozyRoc is one example of a tool, and Andy Leonard has a great blog post describing the general pattern.
In short though, the approaches taken can be summarized as follows:
- You never update, only add (really?)
- Filter at source (doesn’t solve the update issue, but does mean you don’t need to check for existence)
- Use the SCD component to do it one row at a time (SLOOOOW)
- Write to a Staging table, then do a SQL MERGE (common)
- Write the matching keys to a new table, then DELETE from current table, and re-insert (typically only used if an entire partition is to be rewritten)
- Use a third party tool
- Split the INSERT / UPDATEs in the data flow
- Using a MERGE JOIN and a Conditional Split
- Using a Lookup component
In the last pattern, SSIS is used to identify which rows are new (and these are then sent to a data flow destination), and the rows that need to be updated are sent to a staging table, which is then joined to the destination table to do the update. The advantage of this approach is that only the rows to be updated are written to disk twice, and in a DW scenario, most rows in a fact table should be new.
I’ve always been a fan of the Lookup component, with Full Cache, as it is pretty fast. The big problem with a lookup component with full cache is that it all sits in RAM, so you need enough RAM for it all to sit there!
If you are only returning an integer key from your table, that is 32 bit, so 4 bytes per row, that’s 268 435 456 rows per GB of RAM, so a tad over 4 GB of RAM per billion rows. You can also tweak this by filtering the lookup for the date range you are pulling from source, so in today’s age of plentiful RAM, you might as well make use of it. In the approach we’re using, it’s 128 bits per row, so 16 bytes instead of 4. This means 67 108 864 rows per Gb of RAM, or around 16 GB per billion rows. (In all theory, this means my 32 GB laptop will demolish the 1.4 billion rows that the data generator does if it’s unfiltered. I will test and blog soon).
NB: This really makes sense if SSIS sits on a different box to SQL. Of course, if you have less than a billion rows, go ahead and don’t worry 😉
The problem comes in when you have a non-integer key, or, additionally, if you would like to check on whether a row has changed before updating it (especially if there are seldom changes and you have no way of filtering at source, there’s no sense in persisting all of those rows and then updating data that hasn’t changed)
I like to use a hashing function: a Hash simply takes a string value, and converts it to a number – in this example, I use a 64 bit hash, using the hash function FNV1-a (here is a good discussion on comparing hash functions).NB: There is such a thing as hash collisions. Multiple strings can output the same hash. In the case of the update, no biggie, as you will simply update a few records that haven’t changed. In the case of the insert check, be careful not to throw away a record just because it matches! Note that a clash like this will be inserted into the Staging table, so it can be dealt with. The code samples do this by firstly including the business key as well as the Hash in the join, then secondly doing a clash data flow to insert the missed records.
For these examples, you will need the AdventureWorks database, and then the _Setup package creates and populates the rest of the databases and tables – data and log file locations are hardcoded, so you will need to adjust that. Thanks to Adam Machanik as my data creation is based on his bigProduct and bigTransaction generator (“Make big data”).
_ResetDestination.dtsx will clear down the destination and run _Initialload.dtsx to get some data in the destination fact tables.
There is also a test harness package and a Merge join package for completeness.
NB: You will need the additional fields BusinessKeyHash and DeltaHash on any destination and staging table you use, they are BigInt. If you have a single integer key, feel free to use that instead of the BusinessKeyHash anywhere I’ve used it.
OK, now on to the “IncrementalLoadWithLookupHash.dtsx”.
There is a Sequence Container before and after to clear the staging tables – before to be safe, and after because there’s no point in using up space unnecessarily!
The primary sequence container is in the middle, and consists of 3 tasks for Product and 3 for Transaction, as shown in Figure 1 below. Let’s start with product.
The first task as shown in Figure 2 above is a data flow from source: starts with a data flow source, then a derived column task: in this task, we are doing 2 things. Firstly, creating a “BusinessKeyHashSrc” column in which we’ll store the business key as a string to send to the hash component. Secondly, concatenating all the fields that may possibly change (i.e. we’d want to update) to also hash to check for Delta’s. You can see this task in Figure 3 below.
Note that the key for Product is both Product number and SellDateStart: this is a slowly changing dimension, so product number is not unique.
So then, the script component: I won’t go into detail on the internals, as you will be able to copy and paste it to reuse: you will just need to edit the script itself, build and save after connecting it up. Make sure you tick all the columns on the Input columns screen as well.
There are 5 items on the Inputs and Outputs screen you will need to check: they are marked in Figure 4 below, but the first 2 are the HashSrc fields you created in the Derived column transformation, the second 2 are the output fields : and note that naming is vital!
The fifth item is the Synchronous Output setting. It will be a dropdown with one value, make sure it’s selected or the component won’t work (Script components can be built asynchronously, as Jamie Thomson talks about here, but this one isn’t, as this is a more performant layout).
Next, the lookup component. Make sure to only select the BusinessKeyHash and DeltaHash to limit the size of the Lookup Cache (or your Key and the DeltaHash if you have gone that route). This would also be the point to limit the dataset by date if you need to. Then, in the mappings, join the Lookup using the BusinessKeyHash, and output the DeltaHash as DeltaHashExisting, as shown in Figure 5 below.
Finally, set the lookup to redirect rows to no match output on the general screen, then create a data flow destination and insert into it.
Your next step is to create a Conditional split – really simple, does the DeltaHash equal the DeltaHashExisting? If it does not, redirect to a data flow destination in staging, The conditional split is shown in Figure 6.
This then handles the new records, and we have the records to be updated in Staging. The next task is an update, which looks like this:
SET Color = bpSrc.Color,
ListPrice = bpSrc.ListPrice,
StandardCost = bpSrc.StandardCost,
SellEndDate = bpSrc.SellEndDate,
Name = bpsrc.Name,
ProductLine = bpsrc.ProductLine,
DeltaHash = bpSrc.DeltaHash
FROM [ETL_Incremental_DestCompressed].dbo.bigProduct bp
INNER JOIN dbo.bigProduct bpSrc
on bp.BusinessKeyHash = bpSrc.BusinessKeyHash
AND bp.ProductNumber = bpSrc.ProductNumber
AND bp.SellStartDate = bpSrc.SellStartDate
Here you join from the staging to the prod on both the new hash, and the existing keys, to cater for hash collisions.
In the final task for product, we have a data flow source which uses a similar query to identify the collisions, and then inserts them in the destination.
The code is below:
SELECT * from dbo.bigProduct bp
WHERE NOT EXISTS
FROM [ETL_Incremental_DestCompressed].dbo.bigProduct bpInner
WHERE bp.BusinessKeyHash = bpInner.BusinessKeyHash
AND bp.ProductNumber = bpInner.ProductNumber
AND bp.SellStartDate = bpInner.SellStartDate)
The data flows for both product and transaction are below: as you can see, Product had no collisions, but Transaction did! 13 collisions resulting in 25 missed rows out of a recordset of 15 million, which is about typical.
The performance benefits of not writing all the matching rows to the update staging table are apparent in the figure below:
A quarter of the rows are discarded, and not needed to be persisted!
So what is the performance impact of this? Well, let’s compare it to doing a DFT into Staging and then a MERGE statement (a very common scenario). So in the test package and with the data I’m using, the DFT for trx here is 6:06 and the MERGE is 5:26, a total of 11:32.
As opposed to the lookup? Well, the whole sequence, including product and transaction and including clash collision, takes 6:25.
A dramatic improvement. The takeaway: IO matters. These tests were done on a machine with 3 SSDs, 1 for data files, 1 for logs, and another for tempDB, all OCZ Vertex IV or OCZ Vector (so FAST), but still : IO matters. The speed improvement is in 2 places:
Almost 14 million records that were unchanged and hence didn’t need to be persisted at all (if they’d been caught at pulling from source, speed would have been even better).
Just over 16 million records that were only inserted straight into prod and never went to staging.
As your percentage of records that need to be updated increases against the new records and unchanged records, this difference would decrease – but realistically, in a DW environment, for most people, the majority of fact records won’t change.
Copyright © 2013 Aphelion Software, All rights reserved