SCD Type-1 (Upsert) implementation in Spark Scala

M S Dillibabu
3 min readSep 7, 2022

If source don't have a date column, below is the implementation of attaching a date column with current date for each run.

Problem : can you able to overwrite only required data or partitions in the datalake, for upsert records not a full overwrite?

Example: lets assume spark job runs only once in a day and each pull it pulls full data from source, we are adding a column with current_date for each pull. Below is the property to overwrite only particular partitions:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Source — jdbc

target — datalake

//reading data from gcs path which already loaded (example below 2 loads happened that is on may 2 and may3 so 2 dates)val historyDF = spark.read.parquet("/Users/d0m0l9x/Desktop/upsert")historyDF.show()+------+-----+----------+
|number| name| date|
+------+-----+----------+
| 8| msd|2022-05-02|
| 10|dilli|2022-05-02|
| 111| red|2022-05-03|
| 30| qwe|2022-05-03|
+------+-----+----------+
//reading full data from source, we are attaching a column to our data with currentdate example below currentdate = 2022-05-04val historyDF1 = spark.read.jdbc()historyDF1.withColumn("date",current_date()).show()+------+-----+----------+
|number| name| date|
+------+-----+----------+
| 8| msd|2022-05-04|
| 10|dilli|2022-05-04|
| 30| eng|2022-05-04|
| 111| red|2022-05-04|
| 22| aabb|2022-05-04|
| 14| ccc|2022-05-04|
+------+-----+----------+
//filtering only upsert(update+insert) records in upsertdfval upsertdf = historyDF1.drop('date).except(historyDF.drop('date)).as("a").join(historyDF,historyDF1("number")===historyDF("number"),"left").selectExpr("a.number","a.name","coalesce(date,current_date()) as date")
upsertdf.show()
+------+----+----------+
|number|name| date|
+------+----+----------+
| 14| ccc|2022-05-04|
| 22|aabb|2022-05-04|
| 30| eng|2022-05-03|
+------+----+----------+
//fetching dates of update record (previously loaded date) + for insert it would be current dateval distinctDates = upsertdf.
select('date).
distinct.
map(_.getString(0)).collect().toList
println(distinctDates)o/p:- List(2022-05-04, 2022-05-03)//filtering all the data in a particular updated record dateval filteredHistoryDF = historyDF.
filter('date.isin(distinctDates:_*))
filteredHistoryDF.show()
+------+----+----------+
|number|name| date|
+------+----+----------+
| 111| red|2022-05-03|
| 30| qwe|2022-05-03|
+------+----+----------+
//below logic is to combine - full partition data of updated record date+ new datafilteredHistoryDF.withColumn("flag",lit(1)).
union(upsertdf.withColumn("flag",lit(2))).show()
+------+----+----------+----+
|number|name| date|flag|
+------+----+----------+----+
| 111| red|2022-05-03| 1|
| 30| qwe|2022-05-03| 1|
| 14| ccc|2022-05-04| 2|
| 22|aabb|2022-05-04| 2|
| 30| eng|2022-05-03| 2|
+------+----+----------+----+
val finaldf = filteredHistoryDF.withColumn("flag",lit(1)).
union(upsertdf.withColumn("flag",lit(2))).
withColumn(
"rank",
row_number.over(
Window.partitionBy(
'number).
orderBy('flag.desc)
)
).
filter('rank === 1)
finaldf.show()+------+----+----------+----+----+
|number|name| date|flag|rank|
+------+----+----------+----+----+
| 22|aabb|2022-05-04| 2| 1|
| 111| red|2022-05-03| 1| 1|
| 14| ccc|2022-05-04| 2| 1|
| 30| eng|2022-05-03| 2| 1|
+------+----+----------+----+----+
//Below is the property to overwrite only particular partition not allspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")finaldf.drop("flag","rank").write.
mode("overwrite").
partitionBy("date").
parquet("/Users/d0m0l9x/Desktop/upsert")
sys.exit(0)
If you see here we have all dates partition

--

--