Important concept in spark -Hash Partitioning, Range Partitioning and Custom Partitioning

M S Dillibabu
3 min readJun 28, 2022

Partitioning — it means dividing the data into small parts and storing it in distributed systems for parallel computing.

Why Partitioning is required:-

Shuffling operations like repartition, groupBy, groupByKey, joins and many more needs transfer/shuffling of same sort of data in the same partitions to perform faster computation. These operations also need lots of I/O operations. Therefore, partitioning becomes imperative, when the data is key-value oriented. Since the range of keys or similar keys is in the same partition that minimizes shuffling. Hence, processing becomes substantially fast.As a result, by applying partitioning we can reduce the number of I/O operations rapidly. Thus, it speeds up the data processing. As spark works on data locality principle. So, partitioner tells which record goes to which partition. These can be done in 3 ways

Spark partition types:

  1. Hash partitioning
  2. Range partitioning
  3. Custom partitioning

Hash Partitioning:-

It spreads the data based on hash function. It means to spread the data evenly across various partitions, on the basis of a key. To determine the partition in Spark we use Object.hashCode method

partition = key.hashCode () % numPartitions.

GroupByKey ,ReduceByKey — by default this operation uses Hash Partitioning with default parameters.

val df = List ((1,"mark","IT"),(2,"James","IT"),(3,"Berk","CSE")).toDF("id","name","dept")
Output:- df.show()
id name dept
1 mark IT
2 James IT
3 Berk CSE
df.repartition(3) -- by default hash partition sends the record to 3 partitions. Example, for understanding purpose 3 partitions contains 3 rows by hash partitioner (hashCode create some int value and then % 3 then it will tell the partition number that records needs to store)Partition -0 = 1 mark IT
partition -1 = 2 James IT
partition -2 = 3 Berk CSE

Range Partitioning:-

It spreads the data based on range. Example if we have id from 1 to 100, and wanted to store in 3 partitions then 1 to 33 will store in p -0 , 34 to 66 to p-1 and 67 to 100 to p-2 .

Note : — Partitioning is only possible in pair RDDs.

SortByKey — uses Range partitioning to shuffle and store the data in partitions

val pairRDD = data.map(x =>(x.key,x.value));
val partitionedRDD = pairRDD.partitionBy(new RangePartitioner(8,pairRDD));

As i said range partitioning can be done only on pair rdd so make rdd into pair rdd and then specify the number of partition that we need inside partitionBy method.

Custom Partitioner:-

Sometimes we can see there are less number of partitions when we compare with specified number of partitions. Example i read a text file and stored it in rdd.

Val rdd = sc.textFile(“vegetables.txt”) //data stored in 4 partitions as

partition -0 = Cabbage, cauliflower, tomato

partition -1 = Onion, brinjal

partition -2 = potato, beans

Eventhough after repartitioning, the data looks like

val rdd1 = rdd.repartition(5);

partition -0 = Cabbage, cauliflower

partition -1 = beans, brinjal

partition -2 =

partition -3 =

partition -4 = potato, Onion, tomato

We can see some empty partitions but the total number of partitions as 5. So inorder to fill all the partitions we can write some custom partitioner

class EqualDistributionPartitioner(numberOfPartitions: Int) extends Partitioner {
override def numPartitions: Int = numberOfPartitions

override def getPartition(key: Any): Int = {
(key.asInstanceOf[Long] % numberOfPartitions).toInt
}
}


val rdd = sc.textFile("vegetables.txt")

val pairRdd = rddmap(x =>(x.key,x.value))


val equallyDistributedPartitionTwo = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(5))

partition -0 = Cabbage, cauliflower

partition -1 = beans, brinjal

partition -2 =tomato

partition -3 =Onion

partition -4 = potato

--

--