Spark joins, avoiding headaches
Joining two datasets based on a single key
Spark is an amazingly powerful framework for big data processing. But as soon as we start coding some tasks, we start facing a lot of
OOM (java.lang.OutOfMemoryError) messages. There is also a lot of weird concepts like
query plans, etc. that come up once and again.
And probably, the stuff we really care about is just joining two datasets based on a single key. Let’s focus on this issue…
Understanding joins performance in spark
Relational database engines use tree based indexes to perform the joins, that help the engines to avoid loading and scanning the tables searching the matching rows.
Spark lacks index definitions (whether it should or not could be discussed in another post). And even though there is something named
Optimization Engine which tries to improve resource allocation, datasets needs to be prepared to get efficient performance results. But first let’s analyze the basic join scenario by interpreting its optimization plan:
You have probably seen similar execution plans when working with SQL engines. So if we analyze it, Spark first attempt to work out the join sorting both datasets to avoid
n*m (cartesian product) number of iterations.
Before sorting, the Spark’s engine tries to discard data that will not be used in the join like nulls and useless columns.
But, sorting involves exchanging by partition using the key column (which turns out to be expensive due to network latency and disk IO). So, to improve the performance we need to favor locality as much as possible, as shown later in this post.
Size does matter!!!
Joining datasets performance will depend on the strategy we use to tackle each problem scenario. But first, let’s define sizes (rule of thumb):
Small: the entire dataset fits entirely in memory.
Medium: the entire dataset does not fit in memory, but its key dataset does.
Large: Neither the key dataset nor the dataset fit in memory.
Small vs small
Nobody cares about this case, but this would probably be just a single Spark SQL join:
Small vs (medium, large)
If we are joining a small dataset with a large one, it could be a good idea, instead of sorting and exchanging the data all over the cluster, to broadcast the small dataset to every node, allowing the node to access the data locally:
Note that this approach is good for reduced datasets but could suffer performance penalties. These could be overcome using binary search as follows:
Note: this approach is only valid for 1:1 scenarios. For 1:n it could be adapted with some minor changes.
Medium vs large
Sometimes, if the medium dataset is large enough to oversize the memory, we could simply discard useless information from the large dataset, filtering it using a similar approach to the one described before:
Removing useless entries
Large vs large
There is no silver bullet. You could use the previously described approaches, mix them, try to simulate complex indexes, repartition your datasets more accurately to take advantage of data locality. You could also check the following articles that have always been quite useful: