Spark df Upsert (SCD-1 and SCD-2) records to RDBMS
Through Spark code we can find upsert records by comparing source data with the target data (if you want to know about finding the upsert records or to implement scd-1 and scd-2, let me know i will create separate blog)
Problem Statement:- Merge or upsert records only, on SQL Server (it should not be a full overwrite or append because we do have update records as well)
As you all know that we dont have the option of spark df upsert on sql server(only append and overwrite), so i found 3 method to solve this particular problem statement.
- df upsert records write (overwrite)→ sql server stage table → airflow mssql operator to trigger .sql (merge sql command which upsert data to target table) — we store .sql in any storage location (AWS S3, Azure Blob etc) — Airflow pick that sql file and it will execute on sql server
Example code:
merge_mssql_from_external_file = MsSqlOperator(
Refer airflow link for the mssql operator:-
2. Scala code(using java packages) to execute queries on tables by iterating each row in spark dataframe i have made a generic function (below is the code)
import spark.implicits._val df = List((5656565,"mario"),(10,"messironaldo")).toDF("salary","fullname") conprop = new Properties
conprop.put("jdbcUrl","jdbc:sqlserver://;serverName=localhost;databaseName=dilli")val brconnect = spark.sparkContext.broadcast(conprop)val mergequery = "MERGE dill_table as t using (VALUES(?,?)) as s(salary,fullname) on t.salary = s.salary when MATCHED then UPDATE SET T.fullname = s.fullname when NOT MATCHED then insert VALUES (s.salary,s.fullname);"dftosqlerver(df, brconnect, mergequery) //calling our function//if you want only insert records to sql server then passbelow query
//val insertquery = "insert into dill_table(salary,fullname) VALUES(?,?) ;"
3. pandas dataframe in airflow to read from s3 and write to sql server stage and then merge to target sql server table
Below sample code for it:
i. pandas.DataFrame.read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=None, private_key=None, dialect=’legacy’, **kwargs)
ii. sql_df.to_sql(‘dill_table_stage’, engine, if_exists=’overwrite’, index=False).
iii.engine.execute(merge query) or cursor.execute(mergequery)
I prefer going for second method, please let me know if there is any other way to upsert the records to rdbms.