AphelionBlack_980First of all, a Merry Christmas and a happy New Year to all of our clients and partners!2013 has been a year of change here – a rebranding, and a new focus on prebuilt solutions have both been the theme of 2013, and leave us in a strong position heading into 2014.

We have also expanded our market base substantially: with key projects happening in Kenya, Ethiopia and the Netherlands, and new opportunities opening in Nigeria, Belgium and Ireland, Aphelion is pushing hard into the EMEA region.

I am personally going to be based out of Europe to focus on growing our software development arm, as well as spending time in East Africa to work with our partner in Kenya to grow the region. I will be flying back to SA to take care of our strategic projects! For our Cape Town clients: nothing changes, you’ll see me about as often as you do now. For our Jhb client base: I will be in SA at least a week a month (much more in the first couple of months) – and I will be available on Skype and Lync of course.

We are focusing on growing up resources internally to take over my role more and more through 2014, but I do promise my on-going availability for anything that needs my personal attention.

Speaking of software development:
Firstly, our multi-dimensional to tabular converter code will be made available early 2014 as a pre-alpha release for our newsletter readers to test and give feedback on.

Then:
Our initiative to build an industry targeted solution set of Analytic products, targeted at both SQL Server and the Parallel Data Warehouse has started to gain steam, and with some products ready today, and others launching in April, we see ourselves as being ready to take advantage of the advent of the Internet of Things – sensor based analytics is going to be a big part of the future, and whether it is working with projects such as Microsoft’s CityNext, or with mining companies on optimising production, or assisting vehicle tracking companies to reduce costs, we are positioning ourselves to be the go-to partner.

This project is called Project Okavango, and is the base solution set including rules based event extraction from activity streams, exception management, and statistical evaluation of activities to determine validity and identify outliers. This direct sensor data is then enhanced with source system data to determine metrics such as efficiency, productivity, utilisation, MTTR and MTBF in our production model, and combining with both fixed costs and an inferred costs module to determine profitability and highlight high cost areas.

We will then work with customers to tailor the solution set to their industry (and prebuild solutions for some specific industries as well).

In order to accelerate our development, we have been working on code-generation tools:
ELT / ETL generator. As we are targeting both PDW and SQL Ent., we don’t want to build our SSIS and do PDW SPs for each. What we have started to build is a generator tool that we use to define things such as mappings, lookups, target destination type (fact, partitioned fact, SCD 1 Dim, inferred member, orphan handling), and then have two sets of code that work from the same metadata to generate both optimal SSIS and PDW ELT SPs.

We also use the same set of metadata (with some additional metadata tags) to generate our semantic models, to enable accelerated development.

The final step of our toolkit is an integration mapping toolkit. Based on the same ETL/ELT generation code, we have also mapped our report and dashboard fields to database fields, and can read clients’ existing metadata from their reports (if they’re MS) to allow BAs to do a report field to report field mapping. If the client has non-MS reports, a manual process of “this field in this report / this field in this screen comes from this place in the source DB” is needed, but what this lets us do is generate PDW ELT code or SSIS easily and error free.

This should allow us to do integration projects much quicker than without the tool and allow us to be the fastest implementer when we go head to head with the competition: based on having existing data-models, accelerated integration code, and prebuilt cubes and reports.

The other place that is often time-consuming in BI is reconciliation, or balancing. To solve this problem, we are using a Test-Driven-Development approach where expected results are first identified and captured into a database, and then automated tests are run after each ETL. This allows for quick identification of where data does not match, and especially when the first 80% of ETL logic is generated, shortens debugging time significantly.

We are currently working on the first implementation using this toolset, for a customer in the telecommunications space, and I personally am exceptionally excited about the prospects.

Our technology architecture for the suite will follow the below pattern: the below diagram is the PDW version of the architecture.

aph-intro-big

Thank you all for your on-going support!

Distribution theory and hashing

One of the early questions often asked when people hear about a quarter rack is “but what benefit will having just 2 boxes give us over having a single big SQL Server? These aren’t high end individual machines, surely I can get better results by using a scale-up machine?”

Importantly, the benefits of a PDW will be obvious even on a 2-node quarter rack, and in fact the techniques being applied are just more advanced versions of the ones that would need to be performed to get good performance out of a single scale-up machine.

If you refer to the first post (Introduction to PDW) you’ll see that we talk about MPP vs SMP – the very heart of an MPP system is being able to split up a workload across multiple symmetric nodes. On every node, the workload has been partitioned across multiple disk groups by SQL Server partitioning, each partition has been affinitised to a CPU core and a memory space.

This allows for balancing IO to RAM throughput and CPU speed (and is at the heart of why a ‘low-end’ machine, which is in no way low end, just without over-speccing the CPU is used in this stellar appliance. We can thank Intel for a massively overpowered CPU, and now the storage vendors need to catch up!).

The way the PDW is designed, we can split data not just across partitions within a machine, but also across machines.

This is called distribution. And if we can serve 100k rows per second on a single partition on a node, then if we have 8 partitions on a node, and 10 nodes, we can serve 8 million rows a second. And this increase is linear as we add more nodes.

Trivially, you may have thought that we would distribute rows in a round-robin fashion to split them evenly across all the partitions on all the nodes, and if all we ever did was scan an entire table and never join it to another one, then this would be a good technique (from a distribution point of view: there are other considerations that mean batching chunks to a partition would be faster).

But as soon as we want to join two tables – for instance, you want to sum the sales for a particular region, or a particular product, then this is complex.

If you round-robin your rows, the row in the product table for a Trek mountain bike may be on node 1, and sales may be scattered across all the other nodes, and the join between them won’t be satisfied on a single node. In the PDW world, the black magic of the data movement service (DMS) and Dual infiniband (and believe me, when you see it working it will look like black magic), this can be resolved, but it does mean data gets moved around to do so.

So instead of round-robin, we tell the PDW what we want to distribute on. And this is at the heart of most performance tuning you will do on a PDW outside of building statistics.

Product key
Let’s just think about the distributions we want to achieve. Considering the scenario above, we want to have the sales for a given product on the same node as the dimension values for the product. So, easy enough, we distribute the product and the sales both on the product key, and be done with it. Ahhh. Well. Not so fast. Now, when you query for a single product, you will always be hitting a single distribution, i.e. 1 partition on 1 SQL Server node. In a full rack system, you will be getting 1/80th of the performance for this query.

What we can do is replicate dimension tables. Copy them to every node (which does mean they occupy a multiple of the node numbers in space. Not 80 in the above example, but 10. Ten machines, they don’t need to be copied across all partitions). This lets us solve the problem of mismatched joins between facts and dimensions.

Let us assume then that we replicate our product dimension, and distribute the fact table. What should we then distribute the Fact table by?
Product still? Perhaps date?

Date is probably a poor choice. If you use date, then every day you will only be writing to a single distribution. This will hurt performance for writing, and unless your queries span enough distributions, will likely hurt query performance as well.

What you want with a distribution is as even a spread of data as possible: in addition, you want the ability to join across fact tables (for examples, sales and sales items) when necessary to satisfy a query predicate.

This leads to some interesting design choices.
The first is that the PDW starts by using a Hash (which hash? They wouldn’t tell me…) and I assume a modulus of the field you’ve chosen, in order to decide which partition on which node to allocate a row to. We’ve spoken about not using date, and why it is likely a poor choice. A common key, such as customer, would be a good one in a retail scenario – but for a mining company, where the number of customers is small, this no longer holds true.

Keys such as product could be good, again if you are a retailer and the number of products is large. If you are a company such as Apple, product will be truly awful.

Choose a key that is used in as many of your fact tables as possible, but also has a large number of discrete values, where the rowcount per discrete value is relatively even.

Staging tables
Staging tables are a place where many people working with a PDW for the first time tend to just replicate by default. Firstly because in traditional warehouses one does not typically partition a staging table, and secondly because the chosen field you are distributing your production table on typically does not exist in staging yet!

The thinking process could be as follows: we need to do a “WHERE NOT EXISTS” between staging and prod to do inserts, as we have distributed prod on a customer key that doesn’t exist until later in our processing we wish to replicate staging so that we can join successfully on each node.

This is true, but the challenge is that you have now written the same staging data to every node, and incurred the IO you would have in multiples!

For a dimension, sure, but if this is the staging table for your FactTransaction, ouch! Nevermind the additional space requirements.

My advice in a scenario like this is, distribute the table on (for example) client number. No, it won’t match up to the client key on the prod fact, and the PDW will be doing shuffle moves to satisfy the joins – but you are more than making up for this by not incurring massive amounts of IO! And shuffle moves actually work pretty well – Dual Infiniband and DMS black magic.

Skew
The final piece of distribution theory you need to know about is what is called skew. Skew is the scenario we spoke about above : all of the iPad sales have ended up in a single distribution, whereas Samsung’s various incarnations of Galaxy X, Y, Z and Omega (or whatever else they launch next) have at least been kind of well distributed. But even if there are exactly as many Samsung products as distributions, the Windows phone version will have a tiny piece of data, and the Galaxy S will have a massive piece of data. This is data skew. If you have distributed on a nullable column things are worse. Nulls will always go to the first distribution on the first node. So if everything else is exactly evenly distributed, that first distribution on the first node will be overloaded in comparison.

A bit of skew is inevitable. Dealing with large skew however is a more difficult problem. You could deal with it by selling more Samsung Windows Phones, but from a technical level that isn’t going to be a solution you can take to business.

This will require some thought about how you deal with it: while you have to distribute on the hash of a single column, you can combine columns, for instance concatenating product and customer to get a better distribution. The caveats are that you won’t be able to do this for every table, so you may incur shuffle moves, and these tradeoffs are the ones you need to make.

You may also still have skew: large volumes of a particular product being sold from a particular store purely because they’re running a special is just one example. Be aware of skew, but don’t be frightened of it – it is something you need to monitor. This can be done through DBCC PDW_SHOWSPACEUSED.

Two distributions – one poorly chosen and one well chosen are shown below:

aph-distr1
As you can see in the figure below

aph-distr2

Finally, a well chosen distribution, but on a column with nulls is shown:

aph-distr3

Yes, it’s a 3D bar chart! But in this case I think it’s needed.
A full rack solution is shown below:

aph-distr4

This has some skew, but shows a good spread that will get great success.

Incremental loads in SSIS using the

Lookup component and FNV1a hash

in a synchronous script component

All code samples are available here:
https://www.dropbox.com/s/8wohor0hroao59s/MarkGStacey.IncrementalLoadWithHash.zip

There are several treatments of the incremental load pattern in SSIS, as well as some tools that handle this. This is also called a Delta load or an Upsert. TableDifference from SQLBI.Eu, now owned by CozyRoc is one example of a tool, and Andy Leonard has a great blog post describing the general pattern.

In short though, the approaches taken can be summarized as follows:

  • You never update, only add (really?)
  • Filter at source (doesn’t solve the update issue, but does mean you don’t need to check for existence)
  • Use the SCD component to do it one row at a time (SLOOOOW)
  • Write to a Staging table, then do a SQL MERGE (common)
  • Write the matching keys to a new table, then DELETE from current table, and re-insert (typically only used if an entire partition is to be rewritten)
  • Use a third party tool
  • Split the INSERT / UPDATEs in the data flow
    • Using a MERGE JOIN and a Conditional Split
    • Using a Lookup component

In the last pattern, SSIS is used to identify which rows are new (and these are then sent to a data flow destination), and the rows that need to be updated are sent to a staging table, which is then joined to the destination table to do the update. The advantage of this approach is that only the rows to be updated are written to disk twice, and in a DW scenario, most rows in a fact table should be new.

I’ve always been a fan of the Lookup component, with Full Cache, as it is pretty fast. The big problem with a lookup component with full cache is that it all sits in RAM, so you need enough RAM for it all to sit there!

If you are only returning an integer key from your table, that is 32 bit, so 4 bytes per row, that’s 268 435 456 rows per GB of RAM, so a tad over 4 GB of RAM per billion rows. You can also tweak this by filtering the lookup for the date range you are pulling from source, so in today’s age of plentiful RAM, you might as well make use of it. In the approach we’re using, it’s 128 bits per row, so 16 bytes instead of 4. This means 67 108 864 rows per Gb of RAM, or around 16 GB per billion rows. (In all theory, this means my 32 GB laptop will demolish the 1.4 billion rows that the data generator does if it’s unfiltered. I will test and blog soon).

NB: This really makes sense if SSIS sits on a different box to SQL. Of course, if you have less than a billion rows, go ahead and don’t worry ;-)

The problem comes in when you have a non-integer key, or, additionally, if you would like to check on whether a row has changed before updating it (especially if there are seldom changes and you have no way of filtering at source, there’s no sense in persisting all of those rows and then updating data that hasn’t changed)

I like to use a hashing function: a Hash simply takes a string value, and converts it to a number – in this example, I use a 64 bit hash, using the hash function FNV1-a (here is a good discussion on comparing hash functions).NB: There is such a thing as hash collisions. Multiple strings can output the same hash. In the case of the update, no biggie, as you will simply update a few records that haven’t changed. In the case of the insert check, be careful not to throw away a record just because it matches! Note that a clash like this will be inserted into the Staging table, so it can be dealt with. The code samples do this by firstly including the business key as well as the Hash in the join, then secondly doing a clash data flow to insert the missed records.

For these examples, you will need the AdventureWorks database, and then the _Setup package creates and populates the rest of the databases and tables – data and log file locations are hardcoded, so you will need to adjust that. Thanks to Adam Machanik as my data creation is based on his bigProduct and bigTransaction generator (“Make big data”).

_ResetDestination.dtsx will clear down the destination and run _Initialload.dtsx to get some data in the destination fact tables.

There is also a test harness package and a Merge join package for completeness.
NB: You will need the additional fields BusinessKeyHash and DeltaHash on any destination and staging table you use, they are BigInt. If you have a single integer key, feel free to use that instead of the BusinessKeyHash anywhere I’ve used it.

OK, now on to the “IncrementalLoadWithLookupHash.dtsx”.
There is a Sequence Container before and after to clear the staging tables – before to be safe, and after because there’s no point in using up space unnecessarily!

The primary sequence container is in the middle, and consists of 3 tasks for Product and 3 for Transaction, as shown in Figure 1 below. Let’s start with product.

aph-hash1

aph-hash2

The first task as shown in Figure 2 above is a data flow from source: starts with a data flow source, then a derived column task: in this task, we are doing 2 things. Firstly, creating a “BusinessKeyHashSrc” column in which we’ll store the business key as a string to send to the hash component. Secondly, concatenating all the fields that may possibly change (i.e. we’d want to update) to also hash to check for Delta’s. You can see this task in Figure 3 below.

aph-hash3

Note that the key for Product is both Product number and SellDateStart: this is a slowly changing dimension, so product number is not unique.

So then, the script component: I won’t go into detail on the internals, as you will be able to copy and paste it to reuse: you will just need to edit the script itself, build and save after connecting it up. Make sure you tick all the columns on the Input columns screen as well.

There are 5 items on the Inputs and Outputs screen you will need to check: they are marked in Figure 4 below, but the first 2 are the HashSrc fields you created in the Derived column transformation, the second 2 are the output fields : and note that naming is vital!

The fifth item is the Synchronous Output setting. It will be a dropdown with one value, make sure it’s selected or the component won’t work (Script components can be built asynchronously, as Jamie Thomson talks about here, but this one isn’t, as this is a more performant layout).

aph-hash4
Next, the lookup component. Make sure to only select the BusinessKeyHash and DeltaHash to limit the size of the Lookup Cache (or your Key and the DeltaHash if you have gone that route). This would also be the point to limit the dataset by date if you need to. Then, in the mappings, join the Lookup using the BusinessKeyHash, and output the DeltaHash as DeltaHashExisting, as shown in Figure 5 below.

aph-hash5

Finally, set the lookup to redirect rows to no match output on the general screen, then create a data flow destination and insert into it.

Your next step is to create a Conditional split – really simple, does the DeltaHash equal the DeltaHashExisting? If it does not, redirect to a data flow destination in staging, The conditional split is shown in Figure 6.

aph-hash6

This then handles the new records, and we have the records to be updated in Staging. The next task is an update, which looks like this:
UPDATE [ETL_Incremental_DestCompressed].dbo.bigProduct
SET Color = bpSrc.Color,
ListPrice = bpSrc.ListPrice,
StandardCost = bpSrc.StandardCost,
SellEndDate = bpSrc.SellEndDate,
Name = bpsrc.Name,
ProductLine = bpsrc.ProductLine,
DeltaHash = bpSrc.DeltaHash

FROM [ETL_Incremental_DestCompressed].dbo.bigProduct bp
INNER JOIN dbo.bigProduct bpSrc
on bp.BusinessKeyHash = bpSrc.BusinessKeyHash
AND bp.ProductNumber = bpSrc.ProductNumber
AND bp.SellStartDate = bpSrc.SellStartDate

Here you join from the staging to the prod on both the new hash, and the existing keys, to cater for hash collisions.

In the final task for product, we have a data flow source which uses a similar query to identify the collisions, and then inserts them in the destination.

The code is below:
SELECT * from dbo.bigProduct bp
WHERE NOT EXISTS
(SELECT  1
FROM [ETL_Incremental_DestCompressed].dbo.bigProduct bpInner
WHERE bp.BusinessKeyHash = bpInner.BusinessKeyHash
AND bp.ProductNumber = bpInner.ProductNumber
AND bp.SellStartDate = bpInner.SellStartDate)

The data flows for both product and transaction are below: as you can see, Product had no collisions, but Transaction did! 13 collisions resulting in 25 missed rows out of a recordset of 15 million, which is about typical.

aph-hash7a

aph-hash7b

The performance benefits of not writing all the matching rows to the update staging table are apparent in the figure below:

aph-hash8

A quarter of the rows are discarded, and not needed to be persisted!

So what is the performance impact of this? Well, let’s compare it to doing a DFT into Staging and then a MERGE statement (a very common scenario). So in the test package and with the data I’m using, the DFT for trx here is 6:06 and the MERGE is 5:26, a total of 11:32.

As opposed to the lookup? Well, the whole sequence, including product and transaction and including clash collision, takes 6:25.

A dramatic improvement. The takeaway: IO matters. These tests were done on a machine with 3 SSDs, 1 for data files, 1 for logs, and another for tempDB, all OCZ Vertex IV or OCZ Vector (so FAST), but still : IO matters. The speed improvement is in 2 places:
Almost 14 million records that were unchanged and hence didn’t need to be persisted at all (if they’d been caught at pulling from source, speed would have been even better).

Just over 16 million records that were only inserted straight into prod and never went to staging.
As your percentage of records that need to be updated increases against the new records and unchanged records, this difference would decrease – but realistically, in a DW environment, for most people, the majority of fact records won’t change.

 

Copyright © 2013 Aphelion Software, All rights reserved