Spark df Upsert (SCD-1 and SCD-2) records to RDBMS

M S Dillibabu
2 min readOct 12, 2022

--

Through Spark code we can find upsert records by comparing source data with the target data (if you want to know about finding the upsert records or to implement scd-1 and scd-2, let me know i will create separate blog)

Problem Statement:- Merge or upsert records only, on SQL Server (it should not be a full overwrite or append because we do have update records as well)

Solution:

As you all know that we dont have the option of spark df upsert on sql server(only append and overwrite), so i found 3 method to solve this particular problem statement.

  1. df upsert records write (overwrite)→ sql server stage table → airflow mssql operator to trigger .sql (merge sql command which upsert data to target table) — we store .sql in any storage location (AWS S3, Azure Blob etc) — Airflow pick that sql file and it will execute on sql server
    Example code:
First method using airflow mssql operator to execute sql files which has merge command in it
merge_mssql_from_external_file = MsSqlOperator(
task_id='create_table_from_external_file',
mssql_conn_id='airflow_mssql',
sql='mergecommand.sql',
dag=dag,
)

Refer airflow link for the mssql operator:-

2. Scala code(using java packages) to execute queries on tables by iterating each row in spark dataframe i have made a generic function (below is the code)

import spark.implicits._val df = List((5656565,"mario"),(10,"messironaldo")).toDF("salary","fullname")
df.show()
val conprop = new Properties
conprop.put("user","sa")
conprop.put("password","pswd")
conprop.put("jdbcUrl","jdbc:sqlserver://;serverName=localhost;databaseName=dilli")
val brconnect = spark.sparkContext.broadcast(conprop)val mergequery = "MERGE dill_table as t using (VALUES(?,?)) as s(salary,fullname) on t.salary = s.salary when MATCHED then UPDATE SET T.fullname = s.fullname when NOT MATCHED then insert VALUES (s.salary,s.fullname);"dftosqlerver(df, brconnect, mergequery) //calling our function//if you want only insert records to sql server then passbelow query
//val insertquery = "insert into dill_table(salary,fullname) VALUES(?,?) ;"

3. pandas dataframe in airflow to read from s3 and write to sql server stage and then merge to target sql server table

Below sample code for it:
i. pandas.DataFrame.read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=None, private_key=None, dialect=’legacy’, **kwargs)
ii. sql_df.to_sql(‘dill_table_stage’, engine, if_exists=’overwrite’, index=False).

iii.engine.execute(merge query) or cursor.execute(mergequery)

I prefer going for second method, please let me know if there is any other way to upsert the records to rdbms.

--

--

No responses yet