Spark df Upsert (SCD-1 and SCD-2) records to RDBMS

M S Dillibabu
2 min readOct 12, 2022

Through Spark code we can find upsert records by comparing source data with the target data (if you want to know about finding the upsert records or to implement scd-1 and scd-2, let me know i will create separate blog)

Problem Statement:- Merge or upsert records only, on SQL Server (it should not be a full overwrite or append because we do have update records as well)

Solution:

As you all know that we dont have the option of spark df upsert on sql server(only append and overwrite), so i found 3 method to solve this particular problem statement.

  1. df upsert records write (overwrite)→ sql server stage table → airflow mssql operator to trigger .sql (merge sql command which upsert data to target table) — we store .sql in any storage location (AWS S3, Azure Blob etc) — Airflow pick that sql file and it will execute on sql server
    Example code:
First method using airflow mssql operator to execute sql files which has merge command in it
merge_mssql_from_external_file = MsSqlOperator(
task_id='create_table_from_external_file',
mssql_conn_id='airflow_mssql',
sql='mergecommand.sql',
dag=dag,
)

Refer airflow link for the mssql operator:-

2. Scala code(using java packages) to execute queries on tables by iterating each row in spark dataframe i have made a generic function (below is the code)

import spark.implicits._val df = List((5656565,"mario"),(10,"messironaldo")).toDF("salary","fullname")
df.show()
val conprop = new Properties
conprop.put("user","sa")
conprop.put("password","pswd")
conprop.put("jdbcUrl","jdbc:sqlserver://;serverName=localhost;databaseName=dilli")
val brconnect = spark.sparkContext.broadcast(conprop)val mergequery = "MERGE dill_table as t using (VALUES(?,?)) as s(salary,fullname) on t.salary = s.salary when MATCHED then UPDATE SET T.fullname = s.fullname when NOT MATCHED then insert VALUES (s.salary,s.fullname);"dftosqlerver(df, brconnect, mergequery) //calling our function//if you want only insert records to sql server then passbelow query
//val insertquery = "insert into dill_table(salary,fullname) VALUES(?,?) ;"

3. pandas dataframe in airflow to read from s3 and write to sql server stage and then merge to target sql server table

Below sample code for it:
i. pandas.DataFrame.read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=None, private_key=None, dialect=’legacy’, **kwargs)
ii. sql_df.to_sql(‘dill_table_stage’, engine, if_exists=’overwrite’, index=False).

iii.engine.execute(merge query) or cursor.execute(mergequery)

I prefer going for second method, please let me know if there is any other way to upsert the records to rdbms.

--

--