How to Apply SQL Analytics and Windowing functions to Apache Spark Data Processing
How to Apply SQL Analytics and Windowing functions to Apache Spark Data Processing
Introduction – The beauty of being truly native
The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has to perform SQL queries inside any Talend Big Data Batch jobs using the Spark framework. In doing so, I want to teach you how to apply SQL Analytics and Windowing functions to process data inside Spark!
Depending on how familiar you are with the Talend platform, you may or may not know about how our Big Data integration solution gives developers and power users the ability to generate code that is natively executable on a Hadoop cluster; whether it’s MapReduce, Spark, Spark Streaming or Storm.
Technically, Talend will not require an agent to be installed on your Hadoop cluster, a connection to YARN is the only prerequisite. This is the beauty of using a solution based on open-source, standards that has always taken a no-vendor-lock-in approach.
Because Talend’s framework is open, users will be able to inject code inside their integration jobs. Most of the time they will reuse a Java routine when they can’t easily achieve a processing step on the data they are working with. In today’s blog, I want to focus on how to leverage Spark SQL code within a Big Data Spark job, and why it matters A LOT!
The Need for Speed!
Everything started with what looked like a simple use case for a recent Big Data POC. It was a classic banking use case where we needed to show how to calculate a running balance from transaction data. Basically, we had two data sources:
- The list of transactions of the day to process (3M+ records)
- The historic table of end-of-the-day balance for all the accounts in the bank (8M+ records)
While this scenario would have been simple on the whiteboard, when it finally came time to implement the use case in Talend Studio, and it must be performant, this is when the rubber met the road!
Being kind of a brute-force approach guy I thought:
“Get the transactions table sorted, add a sequence number, get the end-of-day balance value you need from that 9M+ rows historic table, then put that in cache (thanks to a nice tCacheOutput component). Then you’ll need to do a join between your cached transactions table and the original transactions table, get some join there and BOOM, calculation, computation, magic, BOOM result.”
Of course, that sounds a little bit of an overkilled as it would have required the use a lot of memory and need to compare a lot of rows together just to get the previous amount of each row. This approach wouldn’t have worked anyway given that the prospect informed us: “Oh, by the way, your competition did that in 3 minutes.”
All right, it was time to think about a smarter way, brute force has its limitations after all!
SQL Analytics Function Baby!
When it comes to smart stuff in big data, let the truth be told, my peers don’t look at me first, they look at our internal Black Belt Bullet Proof Certified Fellows here at Talend. I did the same, got this Jedi-like response: “Little Padawan, the way of tSQLRow, take you must”.
My Jedi Data Master started getting quite excited when the challenge came so he helped to build the first draft of jobs where he was using a tSQLRow.
The beauty of tSQLRow in a Spark job is that you can use SQL queries inside your job, and that query will apply ON THE JOB DATA! Yeah, yeah, not a database or any other hybrid stuff, the actual data that is gathered inside your job! So all the WHERE, ORDER BY, GROUP BY, SUM(), COUNT() and other funny operations can be done through that SQL API inside a job. Yes, that’s very cool!
These specifics functions exist since version Spark 1.4+, Talend is already at Spark 2.1 so it’s usable there.
Thinking about the use case it was about:
- Getting the latest value for an end-of-day balance
- Summing up transaction amount row by row inside each account
Not to mention the need to have some temporary variable to deal with the other constraints of the use case that are not part of the explanation here (e.g. dealing with FUTURE transactions values, generating a row number, etc.).
And this is where I discovered the existence of ANALYTICS AND WINDOWING FUNCTIONS in SQL; probably not a surprise for some of you reading that article, but a TOTALLY NEW Discovery for me!
This where I started getting my hands dirty, and I must say, I just couldn’t get enough!
Partition it, sort it, window it, compute it, filter it, shuffle it…
First, let’s have a look to the data used for my local test with local Spark engine on my Talend Studio (btw I’m using Talend Studio 6.3.1 of the Big Data Platform edition).
EOD Balance sample data:
100|12345|2016-06-08 00:00:00|1.02 100|12345|2016-06-07 00:00:00|0.02 102|20006|2016-06-07 00:00:00|5.02 102|20006|2016-06-08 00:00:00|6.02
Transactions sample data:
103|20007|2016-06-09 02:00:00|105508585836|2016-06-10 00:00:00|F|6.90|D|20160609 100|12345|2016-06-09 06:00:00|111454018830|2016-06-12 00:00:00|C|0.6|D|20160609 102|20006|2016-06-09 01:00:00|125508585836|2016-06-09 00:00:00|F|5.50|D|20160609 100|12345|2016-06-09 02:00:00|33042764824|2016-06-08 00:00:00|B|0.05|D|20160609 101|22222|2016-06-09 02:00:00|121554018830|2016-06-09 00:00:00|C|0.5|D|20160609 100|12345|2016-06-09 02:00:00|33042764825|2016-06-08 00:00:00|B|0.08|D|20160609 100|12345|2016-06-09 03:00:00|33042764830|2016-06-09 00:00:00|C|1.06|D|20160609 100|12345|2016-06-09 05:00:00|110451035129|2016-06-11 00:00:00|C|0.21|D|20160609 100|12345|2016-06-09 07:00:00|185508585836|2016-06-13 00:00:00|F|0.38|D|20160609 100|12345|2016-06-09 04:00:00|33042766082|2016-06-10 00:00:00|C|4.51|D|20160609 101|22222|2016-06-09 01:00:00|101554018830|2016-06-08 00:00:00|C|0.8|C|20160609
See below the job design I used to test my logic on dummy data. The first 2 components are tFixedflow and I used the above sample data as my test data to validate that it worked correctly.
The initial steps are mainly to filter, sort the data correctly (by account number and transaction date), and retrieve the latest EOD balance for each account when it exists (otherwise it means this account is new and it is the first time there has been a transaction in it). It also creates a unique transaction id for each transaction row using a sequence function (available in the numeric library in Talend). This is not just for fun; one of the main headaches I experience was the understanding of the behavior of the analytics function, such as SUM() or LAST_VALUE(). And, having a unique identifier to sort the data inside the partition is mandatory to get the result you want in some cases. This was the case for the SUM() function.
The first tSQLRow component happens right after the join of the data is done. Here is the content of it:
"select posting_transit, posting_acct_num, business_date, system_time, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, isknownaccnt, ROW_NUMBER() OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date ASC) as rowNum, sum(txn_amt) OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date,seq ASC) as balance from out1"
The focus here is on the two functions and the partitioning of the data
- ROW_NUMBER(): return the row number in the order of appearance inside the partition created (so related to the ORDER BY operation).
- SUM(txn_amt): will sum the txn_amt value row by row WHILE THE ORDER BY CRITERIAS ARE UNIQUE. That’s a critical step for the running balance calculation. If the order by criteria of the partition (here business_date, posting_date,seq ASC) were not unique -achieved thanks to the seq variable- then we will end up with a sum of all the txn_amt that happen on the same business_date and posting_date. Which is not what we want.
This is pretty well explained here with sample data:.
When it Comes to Size, Partition Matters!
The OVER(PARTITION BY a,b ORDER BY c,d) instruction is key in our job. Because Spark is an in-memory framework that can be run in parallel in a grid, and especially on a Hadoop cluster; then partitioning the data makes a lot of sense to be performant!
Without being able to give details, the work of the PARTITION BY a,b will be to select a group of data logically to apply a function only to that group of data.
In my use case, I wanted to partition the data by account number (aggregation of transit_number and acct_num). So I only apply my ORDER BY instruction and my analytic function SUM() or ROW_NUMBER() on that particular chunk of data. So, from 3M+ transactions data in one group, I will now have X number of transactions in Y number of partition. Here, I would estimate the number of groups to be around 600 000 with an average of 5 transactions per PARTITION.
So instead of having a huge processing of all the data, I now have a very quick processing in parallel in-memory of numerous, very small groups of data.
Yeah, I know, that’s cool ;) And that should also lead to a pretty good performance result! (well that depends on a lot of things, but this partitioning definitely helps!).
After that first tSQLRow, my data looks like this:
The SUM(txn_amnt) mechanism:
The data is sorted and grouped and the Double value you see (with a lot of numbers after the comma) is the running balance calculated without the EOD balance taken into consideration now. This incremental row-by-row behavior is really due to the fact that we ordered the partition with a unique identifier for each row (yes I insist on that, as it just took me half a day to understand that and create that sequence seq). Not having that and you’ll end up – like me at first sight- with a real SUM of ALL THE txn_amt with the same business_date and posting_date.
Look at the first 7 rows, they are part of the same group (account: 100 12345).
1.02 (row 1 EOD Balance) – 0,05 (row 1 txn_amount) = 0.97 (row1 balance)
0,97 (row1 balance -previously calculated-) – 0,08 (row 2 txn_amount) = 0,89 (row 2 balance)
0,89 (row 2 balance -previously calculated-) – 1,06 (row 3 tx_amount) = - 0,17 (row 3 balance)
The ROW_NUM() mechanism:
You can easily see the result of the row_num() function applied by partition as it reset to 1 after each new transit_number+accnt_number.
LAST, But Not Least!
The final step of my job was to deal with the Future transactions. These transactions don’t require the running balance to calculate as the other (the CURRENT and BACKDATED transactions). The future transaction (indicated with an “F” inside the business_date_indi field) would require having the previous end of day balance value as their running balance value or “null” or the LAST value calculated before.
Let’s say I already add 3 CURRENT or BACKDATED transactions for my account 10012345 with an already calculated running balance. So, if I have one or many FUTURE transactions for the same account 10012345 then I want to set the running balance as the LAST calculated value for the running balance (yeah, I know…it sounds logical on your bank account report or your credit card report.).
That’s where I looked at the function called LAST_VALUE(). And I used it in my last tSQLRow component in my job.
See the content below:
"select posting_transit, posting_acct_num, business_date, system_time, posting_date, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, rowNum , balance, last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance, max(F_tx_only) OVER(PARTITION BY posting_transit, posting_acct_num) as F_tx_only, isknownaccnt from bal ORDER BY posting_transit, posting_acct_num, business_date, system_time "
Let’s focus on that particular piece: last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance
So now you get the partition piece of the story. You’ll notice the absence of ORDER BY instruction. This is on purpose as using another ORDER BY here would result in an error. I think it’s likely because we previously ORDERed BY the same partition, but honestly, there might be an explanation I don’t get (comments appreciated).
So last_value(balance) will return the last_value of the balance calculated inside that partition. Whereas, in the case of a future transaction, we don’t have any running balance calculated.
So one of the tricks I used was to set the “balance” to null in a previous step when it was a FUTURE transaction. But the last_value(balance) would still return that NULL value by default.
TO AVOID THAT, this makes the difference: last_value(balance , true) . I hope you read until that point because it took me five hours of google to get it right, and when I found that, that’s when I decided I’ll write my first blog article.
So this “,true” parameter is telling the last_value() function to avoid the NULL values in its computation. So now you understand why I put those future transaction balance values to null before. It was to avoid them later!
(<private joke> for French reader. I tried the so-called “Rémy OffTheWood” trick and replaced “true” by “trou”…well it doesn’t work, don’t try it at home) </private joke>)
So, how did all this end up in terms of performance against our competition? Well, our first test of the job with no tuning at all was something like six minutes to compute, but of course we wanted to do better, so applied the Spark configuration tuning capabilities inside Talend jobs.
And in one round of tuning, we then turned in a 2min 30 seconds execution time; the most performant result overall!
Next, our prospect tested against more volume and increased both transactions table volume and EOD Balance table (70 million EOD_balance and 38 million for transactions).
With no change to the job (not even tuning properties), we ran in 10 minutes!
So, basically multiplying by 10x the volume in both lookup and main data, just took 4x the time to process without ANY CHANGE. This is what I call native scalability. Oh, and the competition wasn’t even close.
I hope that quick read will avoid hours of Googling for answers to a common integration scenario like I did. This function inside Spark framework is great, and combined with Talend Studio’s capacity, it’s just awesome!
Ease of use wise, it’s just great to be able to reuse SQL skills and apply it to the Big Data world that easily! Again, this proves the advantage of Talend in the Big Data integration world and how leveraging standards and being native on Hadoop is the best thing Talend could have done.
Some of our competitors talk about being “native”, but really it’s just a bullet on a PowerPoint presentation. If you want true native power, you can only get with Talend; try it for yourself.
Here is the result of my job.
Note: Job with sample data inside is attached to that article.
Most Downloaded Resources
Browse our most popular resources - You can never just have one.