Spark optimization in-depth part -2

M S Dillibabu
8 min readJun 3, 2022

Spark Optimization techniques — 2 :-

9. Spark jdbc optimization:

As per my knowledge there are 2 ways to tune a spark jdbc while reading, please feel free to add 1. applying filter condition while reading 2. partition the column into n so that ‘n’ no of parallel reads, helps to ingest the data quickly.

1.one of the simple and effective way is limiting the data being fetched. Instead of reading an entire table, specify a query selecting only those columns and rows which are required, find below example where query is passed while reading JDBC sources using spark.

val query = """ 
SELECT category,
value FROM testdata
WHERE category < 10
"""
val dfReader = spark.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", query)
.option("user", user)
.option("password", password)

We know that SQOOP is also ingestion tool( Hadoop) tool used for the same purpose, which works on top of map reduce (but uses only map tasks to pull data from JDBC sources to Hadoop(HDFS). We can use Spark as ingestion tool, infact spark delivers better performance when we compare with SQOOP. If you want the reason behind this please let me know will prepare a separate blog specially for this.

2.By default, Spark will store the data read from the JDBC connection in a single partition. As a consequence, only one executor in the cluster is used for the reading process. Partition the primary key column in sql server table into n (Eg: 4) so that 4 executors are used and they read the data parallely and for that following four options are passed while reading:

  • partitioningColumn determines which table column will be used to split the data into partitions(Primary key column is recommended) or the data type of partitioning column needs to be NUMERIC, DATEor TIMESTAMP.
  • numPartitions sets the desired number of partitions.
  • lowerBound and upperBound are used to calculate the partition boundaries.
val df = spark.read.format(“jdbc”)
.option(“url”,”jdbc url”)
.option(“driver”,”com.mysql.jdbc.driver”)
.option(“dbtable”,”tablename”)
.option(“user”,”username”)
.option(“password”,”pswrd”)
.option(“lowerBound”, 0)
.option(“upperBound”,10000)
.option(“numPartitions”, 4)
.option(“partitionColumn”, id)
.load()

Here we will be having 4 partitioned files in which each partition has (0 to 2500), (2501 to 5000), (5001 to 7500) and (7501 to 10000) the customer_id data( Note : These number bounds are written just to make you understand but in reality it will be similar kind).

You can have doubt like how can i know lower bound and upper bound value of a column in table. You can directly query min and max value in sql server or

First run min and max query through spark scala code like we did in above filter query and store it in 2 variables. pass those 2 variable to the lowerbound and upperbound options. So every time when you run it will first calculate min and max value and those values are sent to lowerbund and upperbound.

val query_min_max = """ 
SELECT min(id), max(id)
FROM testdata
"""
val dfMinMax = spark.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", query)
.option("user", user)
.option("password", password)
val lowerbound = dfMinMax.first.getInt(0)
val uppedbound = dfMinMax.first.getInt(1)
pass these two values if you dont know about those 2 values

or

else the first and last partition will contain all the data outside the respective upper or lower boundary if they do not match the actual boundaries of the data. As this can impact performance, the lower and upper bound should be close to the actual values present in the partitioning column.

3. if you dont have a column which can be partitioned then there is another method of making some string column as integer value by applying in built functions will post here later.

10. Avoid using Regex’s

Java Regex is a great process for parsing data in an expected structure. I know this cant be avoided, Unfortunately, the Regex process is generally a slow process and when you have to process millions of rows, a little bit of increase in parsing a single row can cause the entire job to increase in processing time. If at all possible, avoid using Regex’s and try to ensure your data is loaded in a more structured format.

11. Map vs Mappartition

Its important to know when we need to apply map and mappartition.

map()

map function is a transformation function and gets applied on every element of the RDD and returns a new RDD with transformed elements.

In our sample RDD, the map function will be called on each element in the RDD. Lets say we have 2000 records, map function is applied on all the 2000 records (it will call 2000 times) and our output should have 2000 records(input records number = output record numbers)

mapPartitions()

mapPartitions is a transformation function and gets applied once per partition in the RDD. In our sample RDD, mapPartitions will be called once per partition, Lets say we have total 2000 records divided into 5 partitions which means each partition has 400 records. so it will be called 5 times because we have 5 partitions in our RDD.

When to use map() and when to use mapPartitions()?

Use mapPartitions function when you need to perform heavy initialization before you transform the elements in the RDD.

Let’s say you need a database connection to transformation elements in your RDD. It doesn’t make sense to initialize database connection for every element in RDD. If we do that, we will end up with initializing database connection 2000 times. Which is not ideal.

Ideally we want to initialize database connection once per partition/task. So mapPartitions() is the right place to do database initialization as mapPartitions is applied once per partition.

Here is a code snipped which gives you an idea of how this can be implemented.(map is used inside mappartition inorder to apply transformation on each record)

Below examples proves why we need to use mappartitions.

if we use only

val rddTransformed = rdd.map(row => {val connection = new DBConnection()  //db connections is made once per record which means 2000 times (not a recommeneded)val partitionTransformed =  row.getString(0) + row.getString(1)
connection.close() // close dbconnection here
partitionTransformed.iterator // returns iterator
})
  • if we use mappartitions for heavy initialization
val rddTransformed = rdd.mapPartitions(partition => { val connection = new DBConnection()  //db connections is made once per partition instead of all recordsval partitionTransformed = partition.map( element => { 
element.getString(0) + element.getString(1)
}).toList

connection.close() // close dbconnection here
partitionTransformed.iterator // returns iterator
})

Spark mapPartitions() provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row. This helps the performance of the job when you dealing with heavy-weighted initialization on larger datasets.

12. Bucketing and Partition By columns:

We know that shuffling is the main reason to affect the spark Job performance. In order to reduce shuffling of data one of the important technique is to use bucketing and repartitioning of data apart from broadcasting, caching/persisting dataframes.

These are the 2 optimization techniques which we used for hive tables. But here what is bucketing and what is partition By.

Both are used to organize data in the filesystem where we have to deal with large datasets and leverage that in the subsequent queries by distributing data into partitions in a more efficient way. Partition By is grouping of same type of data into a partition which will be stored into a directory. Bucketing is also a similar kind of grouping but it is based on hashing technique which will be stored as a file. For example if we have a table columns like id, state and few other columns read in a data frame, partition by can be applied on state column because we can group each state records into each partition. What if we have all unique data like id, in that case we need to use bucketing, where data’s are grouped based on the hash function. Finally, when we have a distinct column data in that case need to use bucketing and when we have duplicate data then in that case partition by helps. Partition By may create uneven partitions but bucketing creates even distribution of data. We have used bucketing in Spark 3 to resolve the data shuffling problem to join multiple dataframe. Here, We have used bucketing only since we have all distinct data.

Why we really used bucketing?

Spark is a distributed data processing engine. It will split the data into partitions to get better performance and assign specific chunks of data to the computational engine. But in a few cases Spark needs to shuffle data between the executors in the cluster. The reason for shuffling is transformations required on partition data that is not present locally in the partition.

  • Data shuffling happens when we join two big tables in Spark. While spark joins two dataframe by key, the partition needs to move the same value of join key in the same executor.
  • Shuffle also happens when we want to perform groupByKey to collect all the values for a key together and perform an action on them. You can reduce the data shuffling by replacing groupByKey with reduceByKey. But there are few cases where you have to use groupByKey.

Bucketing involves sorting and shuffling the data prior to the operation which needs to be performed on data like joins.

As we already know, bucketing will organize the data in partition with better structure so query performance will improve. This performance improvement can be achieved by avoiding the shuffling between the executors.

With Bucketing we can save the data in a pre-shuffle state. If two dataframe have the same number of buckets, same bucket key and sort key. then in this case Spark is aware about data distribution and no need to do shuffle.

We have tried the repartition and caching approach for joining multiple data frames but got better results with bucketing.

  • Below is the example to create the bucket in SparkAPI. bucketBy is the function to create the bucket in spark. We need to save the information about the bucket somewhere, so we need to use saveAsTable here to save the metadata information about the bucket table.
val df1 = df.write.mode(“save_mode”).option(“path”, “file path”) \.bucketBy(n, ‘col1’, ‘col2’..) \.sortBy(‘col1’, ‘col2’) \.saveAsTable(‘table_name’, format=’parquet’)val df2 = spark.table(‘table_name’)
  • In the above example we used bucketBy and sortBy as in some cases we have multiple join keys and wanted to put integer key in bucketBy and String key in sortBy. sortBy is optional while we do data Bucket.
  • One can decide the number of bucket sizes based on data size and query we run on the data. Usually one can prefer the 100 MB to 200 MB per bucket.
  • The main problem inSpark bucketBy function is that it may create so many small files and it may lead to performance problems. It can be avoided by creating a custom partition before writing the bucket data.

This problem in Spark is very different from bucketing in Hive.

Spark creates the bucket files per the number of buckets and task writer. So, it will usually create number of file = Number of buckets * number of spark writer

If you assign 200 buckets to the dataframe and no partition set for the spark data frame. In this case it may create number of files 200 buckets * 200 default partitions(1 task per partition). So, the simple way we can understand that spark jobs have 200 tasks and each task carries the data for 200 buckets. So, to resolve this, we need to create one bucket per partition and it can be achieved by custom partitions. Spark uses the same expression to distribute the data across the buckets and will generate one file per bucket. inorder to overcome this we need to apply some hashing method, What happend if we have one dataframe has 50 buckets and other has 100 buckets or one df is non bucketed and other is bucketed will create a separate blog for bucketing alone since its a vast area i would like to concentrate on spark optimzation here so written only main content.

spark.conf.get(“spark.sql.sources.bucketing.enabled”)

Previous post on spark optimization:

https://medium.com/@msdilli1997/spark-optimization-techniques-681cb85d01f9

Upcoming posts, I will try to cover few more optimization techniques in depth.

  • choosing proper resource allocation
  • rdd vs dataset vs dataframe
  • using serialization kryo -conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”
  • broadcast joins , shuffle hash join and sort merge join
  • garbage collection tuning
  • data skewness handling
  • spark 3 optimized features and few more

--

--