Spark Optimization techniques :-

M S Dillibabu
7 min readMay 29, 2022

1. Don’t use collect. Use take() instead

When we call the collect action, the result is returned to the driver node. This might seem innocuous at first. But if you are working with huge amounts of data, then the driver node might easily run out of memory.

df = spark.read.csv("/FileStore/tables/train.csv", header=True)
df.collect()
df.take(1)

take() action scans the first partition it finds and returns the result. As simple as that! For example, if you just want to get a feel of the data, then take(1) row of data.

2. Cache and Persist

Spark provides its own caching mechanism like Persist and Caching. Persist and Cache mechanisms will store the data set into the memory whenever there is requirement, where you have a small data set and that data set is being used multiple times in your program. If we apply RDD.Cache() it will always store the data in memory, and if we apply RDD.Persist() then some part of data can be stored into the memory some can be stored on the disk.

  • Cache() — Memory only for RDD , Memory and disk for Dataset
  • Persist() — can pass any parameters which is listed below

If you find you are constantly using the same DataFrame on multiple queries, it’s recommended to implement Caching or Persistence.

Note: Avoid overusing this. Due to Spark’s caching strategy (in-memory then swap to disk-spill over) the cache can end up in slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.¹

3. Reduce wide transformations (shuffling of data)

  • Shuffling of data is one of the main performance consuming thing in spark job. So it is advised to reduce the shuffles as much as possible.
  • Basically when we use wide transformations (like reduceByKey, groupByKey, joins etc.,) there happens a shuffle of data between nodes/partitions. So try to avoid, Below is the example where we can use reduceByKey instead of groupByKey.
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()

val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
  • Use reduceByKey instead of groupByKey beacuse reduceByKey internally reduces the same key value and then shuffles the data but groupByKey shuffles data and then they try reducing it.

4. Avoid UDF

UDF’s are a black box to Spark hence it can’t apply optimization and we will lose all the optimization that Spark does on Dataframe/Dataset (Tungsten and catalyst optimizer). Whenever possible we should use Spark SQL built-in functions as these functions designed to provide optimization.

5. Using Broadcasting and accumulators variables concept if there is requirement

Accumulators and Broadcast variables both are Spark-shared variables. Accumulators is a write /update shared variable whereas Broadcast is a read shared variable. In a distributed computing engine like spark its necessary to know the scope and life cycle of variables and methods while executing because data is divided and executed parallely on different machines in a cluster. We need to have a shared variable concept.

5.1. Accumulators — Accumulators are also known as counters in map reduce. Which is a update variable.Take an example of a particular word count in a file, if we read a file in spark, it divides into n no of partitions, every task / logic performs on each partition parallely, here we need a particular word count in a whole file which is divided into n partitions so each partition word count needs to be added (cumulative of all the word count) for that accumulator came into existence where each executor goes and adds the value to the shared accumulator variable.

using accumulators

So, accumulators is used when you want Spark workers to update some value.

5.2. Broadcast Variable — Its a read only variable that is cached on all the executors to avoid shuffling of data between executors. Basically, broadcast variables are used as lookups without any shuffle, as each executor will keep a local copy of it, so no network I/O overhead is involved here. Imagine you are executing a Spark Streaming application and for each input event, you have to use a lookup data set which is distributed across multiple partitions in multiple executors; so, each event will end up doing network I/O that will be a huge, costly operation for a streaming application with such frequency.

6. File format selection

Row vs Columnar file format. Row file formats like Avro and columnar like Delta, parquet, ORC. We need to select the correct file format to be used in between source and destination. For example if we have usecase like input file is a csv and output needs to be also in csv, then in intermediate we need to select the correct file format which will be useful for us. Take a scenario where we are applying more filter based conditions on very few columns(1 or 2) (subset of columns) then in that case a columnar (parquet/orc) based file format suits. Another scenario if we need to work on all the selected columns then in that case row based file(AVRO) format helps and it depends on many other factors too. In simple terms if we are applying conditions on only few columns out of many then columnar or else if we apply conditions on all the columns then Row based file format. There is a detailed explanation gave on below link

https://luminousmen.com/post/big-data-file-formats

7. Choosing Number of shuffle partition:

we need to create a job which utilize all your resources. It should not be under-utilized or over-utilized. While choosing no of partitions we need to consider 2 things

  • Data Spill over
  • Maximize Parallelism by utilizing all the cores.

Setting and getting shuffle partition number:

Spark.conf.set(“spark.sql.shuffle.partitions”,200)

Spark.conf.get(“spark.sql.shuffle.partitions”)

Spills:
Spill happens whenever there is Shuffle and the data has to be moved around and the executor is not able to hold the data in its memory. So it has to use the storage to save the data in disk for a certain time. When we don’t set right size partitions, we get spills. Always avoid Spills. In Simple terms, Spills is moving the data from in-memory to disk, we know that reading a data from disk incurs Disk I/O time. Default number of shuffle partitions in spark is 200. Take each partition holds 100 MB of data (recommended would be 100 or 200 MB of data for each partition) —

Formula to calculate a optimal shuffle partition number:

partition count= Total input file data size / each partition size

if (core count ≥ partition count) then set no of shuffle partitions should be partition count

else no of shuffle partitions should be a factor of the core count. Else we would be not utilizing the cores in the last run.

Example:

if we have a 100 GB(100 * 1024 MB)file = 102400 MB and 96 core machine and each partition hold 100 MB

partition count = 102400/100 = 1024

our core count = 96, here core count < partition count so the no of shuffle partitions should be a factor of the core count (from above formula).

No of shuffle partiton = factor of core count (96) = 96 *10 nearly equals 1024

so we need to set shuffle partition as 960

so no of shuffle partition is = 960 (partitions should be a factor of the core count)

  • Spark.conf.set(“spark.sql.shuffle.partitions”,960)

Points to take note:

  • Read the input data with the number of partitions, that matches your core count
  • Spark.conf.set(“spark.sql.files.maxPartitionBytes”, 1024 * 1024 * 128) — setting partition size as 128 MB
  • Apply this configuration and then read the source file. It will partition the file into multiples of 128MB
  • To verify df.rdd.partitions.size
  • By doing this, we will be able to read the file pretty fast
  • When saving down the data, try to utilize all the cores.
  • If the number of partitions matches the core count or is a factor of core count, we will achieve parallelism which in turn will reduce the time.
  • Spark.conf.set(“spark.sql.files.maxPartitionBytes”, 1024 * 1024 * 100) — to set each partition data holding size as 100 MB.

Make sure to check below things.

  • Too few partitions will lead to less concurrency.
  • Too many partitions will lead to a lot of shuffles.
  • Partition count in common lies between 100 and 10,000.
  • Lower Bound: At least ~2x number of cores in the cluster.
  • Upper Bound: Ensure tasks take at least 100ms.

8. Repartition VS Coalesce peformance:

I have explained in detail on repartiton vs coalesce in below link. It tells where we need to use repartition and where there will be a need of coalesce

Please refer below link for Second post on spark optimization part — 2

I have covered below topics

  • spark jdbc optimization
  • spark partitioning and bucketing for speed reading
  • mappartition vs map:

Upcoming posts, I will try to cover few more optimization techniques in depth.

  • choosing proper resource allocation
  • rdd vs dataset vs dataframe
  • using serialization kryo -conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”
  • broadcast joins , shuffle hash join and sort merge join
  • garbage collection tuning
  • data skewness handling
  • spark 3 optimized features and few more

Feel free to drop me a mail if you have any queries or planning to make a career in data engineering — “msdilli1997@gmail.com” . I can help you to get your doubts clarified

--

--