A data engineers guide to spark performance tuning.
In this article, I am going to touch upon a few Spark configurations and optimizations that I have found to be very useful when designing and writing your Spark jobs. While these are low-hanging fruits, they are often overlooked and cause suboptimal Spark jobs as a result.
Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance.
First I would like to start with 3 major misconceptions about Apache Spark that are commonly assumed as people start writing Spark applications
- Misconception: Spark provides default job configurations which will be enough for running any kind of application.
Fact: Although Spark provides default configurations, we need to fine-tune the configurations and adjust some parameters for better performance
- Misconception: Apache Spark is a low-code engine that allows a developer to create applications with little or no code through a visual drag-and-drop interface.
Fact: We need to write optimized code for better performance of the applications and we do not have a visual drag-and-drop interface in Apache Spark.
- Misconception: Apache Spark has a storage layer for storing input and output data.
Fact: Spark is a data processing analytical engine, it does not have a permanent storage layer for storing the data permanently. Though we can persist or cache the data during the job execution, everything would be cleaned up after the job completion.
Now let's discuss three major Optimization categories for Apache Spark applications:
- Optimizations at the Code Level
- Optimizations at the Storage level
- Optimizations at the Configuration level
Optimizations at code-level
1. Usage of Dataframe/Dataset instead of RDD
DataFrames and Datasets are built on top of Spark RDDs and provide several benefits over RDDs. The DataFrame/Dataset API also leverages two technologies that help improve performance:
Project Tungsten: A broad set of technology improvements that are designed to make Spark run faster and use resources more efficiently. It includes several components, such as off-heap memory management, bytecode generation, and binary data encoding, which work together to improve the performance of Spark’s data processing engine.
Catalyst Optimizer: A query optimization engine that is used by Spark to generate efficient execution plans for DataFrame and Dataset queries. It uses a variety of techniques, such as predicate pushdown, column pruning, and code generation, to optimize the execution of data processing tasks. It leverages advanced programming language features (e.g. Scala’s pattern matching and quasi quotes) in a novel way to build an extensible query optimizer.
2. Using Cache or Persist
Spark has its caching mechanisms, such as persist() and cache().
In Apache Spark, using the cache() method on data objects defaults to storing the data in memory (MEMORY_ONLY storage level), facilitating faster access in subsequent operations. On the other hand, the persist() method offers more flexibility, allowing you to specify different storage levels. For instance, you can choose to store the data entirely in memory (MEMORY_ONLY), on disk (DISK_ONLY), or a combination of both (MEMORY_AND_DISK). This flexibility with persist() enables more efficient use of resources, particularly in scenarios where the dataset is too large to fit entirely in memory or when different storage strategies might yield better performance.
So when do we have to use these caching mechanisms?
suppose you have written a few transformations to be performed on an RDD. Now each time you call an action on the RDD, Spark recomputes the RDD and all its dependencies. This can turn out to be quite expensive.
Python
l1 = [1, 2, 3, 4]
rdd1 = sc.parallelize(l1)
rdd2 = rdd1.map(lambda x: x*x)
rdd3 = rdd2.map(lambda x: x+2)
print(rdd3.count())
print(rdd3.collect())
When I call count(), all the transformations are performed and it takes X amount of time to complete the task.
When I call collect(), again all the transformations are called and it still takes X amount of time to complete the task.
Now for the same above code, let's add persistent functionality
Python
from pyspark import StorageLevel
rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# before rdd is persisted
print(rdd3.count())
# after rdd is persisted
print(rdd3.collect())
In our previous code, we only have to persist in the final RDD. This way when we first call an action on the RDD, the final data generated will be stored in the cluster. Now, any subsequent use of action on the same RDD would be much faster as we had already stored the previous result.
There are other options as well to persist the data.
Source: Learning Spark by O'Reilly
3. GroupByKey Operations
ByKey operations are used to perform operations based on keys.
Spark has many ByKey operations like ReduceByKey, GroupByKey, SortByKey, CombineByKey, SubtractByKey, and FoldByKey.
ByKey operations generate a lot of shuffle. Shuffles are heavy operations because they consume a lot of memory.
The most commonly used ByKey operation is groupByKey. Instead of groupBy, a user should go for the reduceByKey because groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much. Therefore, reduceByKey is faster as compared to groupByKey.
4. Usage of Broadcast variables
Broadcast variables in Apache Spark are a mechanism to keep a read-only variable cached on each worker node, rather than sending a copy of it with each task. This approach is particularly efficient for distributing a small dataset across nodes. It enables every node to access this dataset without the overhead of transmitting it multiple times. Spark leverages sophisticated broadcast algorithms to optimize the distribution of these variables, thereby minimizing communication costs.
Consider a scenario where we need to join a small dataset with a significantly larger one. Utilizing a broadcast join is a strategic choice in this context. By broadcasting the smaller dataset, Spark caches it on each node. Consequently, when performing the join operation, the large dataset interacts with the small, locally cached dataset on each node. This method not only streamlines the join process but also significantly reduces the data shuffling typically associated with large-scale joins, enhancing the overall efficiency of the operation.
The syntax to use the broadcast variable is dataset1.join(broadcast(dataset2)). Here dataset2 is a small dataset that we want to broadcast to every node in the cluster.
By default the maximum size for a table to be considered for broadcasting is 10MB but this can be changed via spark.sql.autoBroadcastJoinThreshold parameter.
5. Reduce usage of expensive Count and Collect operations
Collect (Action) - Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. But if we are working with huge amounts of data and calling the collect() operation without using filters might return huge data to the driver then the driver node might easily run out of memory.
If you just want to get a feel of the data, then take(1) or show(1) would be useful. It scans the first partition it finds and returns the result.
6. Using proper partitioning and parallelism
Dividing large datasets into smaller, manageable chunks through partitioning enables parallel processing, which boosts performance. Several factors can influence the optimal number of partitions and partition size for any application, including the size and complexity of the data being processed.
Choose too few partitions, you have many resources sitting idle.
Choose too many partitions, you have a large number of small partitions shuffling data frequently, which can become highly inefficient.
So what’s the right number?
Shuffling, the process of redistributing data, can be a performance bottleneck. Optimizing shuffle settings, such as adjusting the number and size of partitions, can significantly improve performance.
To optimize the number of partitions and partition size, it is often recommended to:
6.1 Experiment with different partition sizes: Different partition sizes may work better for different data sets and data processing tasks. It may be useful to try different partition sizes and measure the performance of the application to determine the optimal partition size.
6.2 Use the Spark configuration settings: Spark provides several configuration settings that can be used to control the number of partitions and the partition size, such as spark.sql.shuffle.partitions and spark.default.parallelism. The Spark programming guide recommends a partition size of 128 MB.
Level of Parallelism
Whenever a Spark job is submitted, it creates the tasks that will contain stages, and the tasks depend upon partition so every partition or task requires a single core of the system for processing.
There are two ways to maintain the parallelism – Repartition and Coalesce.
Repartition triggers a full shuffle of data and distributes the data evenly over the number of partitions and can be used to increase and decrease the partition count. Coalesce is typically used for reducing the number of partitions. It is important to notice that coalesce() does not always avoid shuffling.
Repartition example
Let's create a DataFrame with the numbers from 1 to 12.
Unset
val x = (1 to 12).toList
val numbersDF = x.toDF("number")
numbersDF contains 4 partitions on my machine.
Unset
numbersDF.rdd.partitions.size // => 4
Here is how the data is divided on the partitions:
Unset
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
Let's do a full shuffle with the repartition method and get this data on two nodes.
Unset
val numbersDFR = numbersDF.repartition(2)
Here is how the numbersDFR data is partitioned on my machine:
Unset
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).
Coalesce example
If we wanted to coalesce numbersDF dataframe into 2 partitions.
Before coalesce
Unset
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
After coalesce
Unset
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Notice that Node 1 and Node 3 did not require its original data to move.
7. Avoid or Reduce the usage of UDF
User-defined functions (UDFs) in Apache Spark are custom functions created by users to extend the capabilities of Spark's SQL and DataFrame APIs.
However, the usage of UDFs can still introduce performance challenges. This is primarily because UDFs are not subject to the same optimization process as built-in Spark functions. These optimizations are necessary for UDFs to operate more efficiently, leading to increased processing times, especially with large datasets or when UDFs are complex. Therefore, while UDFs are powerful tools for custom operations, they should be used sparingly and consciously, being aware of how they might conflict with Spark’s internal optimizations.
8. Prefilter irrelevant data
The key point to improve the performance of joins and other processing is to filter data in earlier steps which we don't need in the result set. Let's say we want to process and store only employees in 'Berlin'. Then instead of reading the whole data and process and then applying filters on the result, we can filter our dataframe earlier while reading the tables into the dataframe.
Spark optimizes the logical plan internally. It uses predicate pushdown for supported file formats and pushes filters at the beginning, and then it does the processing on filtered data.
9. Disable or reduce DEBUG and INFO Log Levels
For large data sets or complex data processing tasks, it is often recommended to disable or reduce the amount of logging output. This can be done by setting the log level to a higher severity level, such as to WARN or ERROR.
It reduces the overhead of generating and outputting all log messages including low-priority DEBUG and INFO types.
Optimizations at the Storage level
1. Using proper data formats
Choosing proper data formats includes choosing a better Serializer and using better Compression libraries
Any object has three primary characteristics: identity, state, and behavior. The state represents the value or data of the object.
Serialization is the process of converting an object’s state to a byte stream. This byte stream can then be saved to a file, sent over a network, or stored in a database. The byte stream represents the object’s state, which can later be reconstructed to create a new copy of the object. Deserialization is the reverse process of serialization. It involves taking a byte stream and converting it back into an object.
Spark uses Java Serializer by default. we can explicitly specify the Kryo serializer for better performance.
Kryo serializer is in compact binary format and offers processing 10x faster than Java serializer.
To set the serializer properties:
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
We can use compression libraries like LZ4, LZF, Zstd, and Snappy. Snappy is a widely used compression library.
2. Choosing the appropriate file format
Spark is compatible with many file formats such as Text, JSON, CSV, Parquet, ORC, Avro, Binary, etc.
2.1 Text format is widely used for unstructured data.
2.2 Binary format is used for unstructured data like image data
2.3 The JSON format is used for semi-structured data.
2.4 ORC and Parquet are used for Structured data, as both are columnar formats. Parquet is a widely used file format for structured data.
2.5 Avro stores schema in JSON format, and it has broad compatibility with other systems.
Optimizations at the Configuration level
1. Dynamic Resource Allocation
Spark provides a mechanism to dynamically adjust the resources our application occupies based on the workload. This means that our application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is handy if multiple applications share resources in your Spark cluster.
This requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled, or 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled,
2. Determine Memory Consumption
The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. The page will tell you how much memory the RDD is occupying.
To estimate the memory consumption of a particular object, use SizeEstimator’s estimate method. This is useful for experimenting with different data layouts to trim memory usage, as well as determining the amount of space a broadcast variable will occupy on each executor heap.
3. Garbage Collection Tuning
As we know underneath our Spark job is running on the JVM platform so JVM garbage collection can be problematic when you have a large collection of an unused object. The first step in GC tuning is to collect statistics by choosing – verbose while submitting spark jobs. In an ideal situation, we try to keep GC overheads < 10% of heap memory.
References: