Category Archives: sql

A generic python based ETL pipeline solution for Databricks

Below is the code necessary to create a Databricks notebook source file that can be imported into Databricks. This file can act as a template for creating ETL logic to build tables in Databricks. Once the notebook is prepared it can be set to run by a Databricks workflow job.

The template is parameterized. This means the developer just needs to provide the destination database, the destination schema, the destination table and the SQL logic.

(Note: this simple example is a full load solution and not a incremental load solution. An incremental load solution can be achieve by writing sufficiently robust SQL that is use case specific.)

The SQL is provided as a variable and the table or table names are stored in a list allowing for a large degree of flexibility for creating a single pipeline that builds multiple database objects.

Another important feature of the code is that it compensates for the fact that Databricks does not have a native acknowledgement of primary keys or restrictions on their violations. A list of primary keys can be provided and if any of those keys are null or not distinct the code will throw an error.

The code will also assign metadata fields to each record created including the job run id as the ETL id, the created date and the updated date.

# Databricks notebook source
# MAGIC %md
# MAGIC https://tidbytez.com/<br />
# MAGIC This is an ETL notebook.<br />

# COMMAND ----------

# Libraries
import os
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit, concat_ws, isnan, when, count, col
from datetime import datetime, timedelta

# COMMAND ----------

# Functions

# Generate ETL ID
def get_etl_id():
    try:
        run_id = (
            dbutils.notebook.entry_point.getDbutils()
            .notebook()
            .getContext()
            .currentRunId()
            .toString()
        )
        if run_id.isdigit():
            etl_id = bigint(run_id)
            return etl_id
        else
            etl_id = bigint(1)
            return etl_id
    except:
        print("Could not return an etl_id number")


# Build database object
def build_object(dest_db_name, schema_name, table_name, pk_fields, sql_query):

    # Destination Database and table
    table_location = dest_db_name + "." + table_name
    # External table file location
    file_location = "/mnt/" + schema_name + "/" + table_name

    # Create Dataframe
    df = sql_query

    # Count nulls in Primary Key
    cnt_pk_nulls = df.select(
        [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in pk_fields]
    ).collect()[0][0]
    # Dataframe record count
    cnt_rows = df.count()
    # Primary Key distinct count
    cnt_dist = df.dropDuplicates(pk_fields).count()
    # Error message
    message = ""

    # Join metadata to dataframe
    global meta_df
    meta = meta_df
    df = df.withColumn("key", lit(1))

    # inner join on two dataframes
    df = df.join(meta, df.key == meta.key, "inner").drop(df.key).drop(meta.key)

    # Write dataframe to table
    if cnt_pk_nulls == 0:
        if cnt_rows == cnt_dist:
            df.write.mode("overwrite").format("delta").option(
                "mergeSchema", "false"
            ).option("path", file_location).saveAsTable(table_location)
        else:
            message = "Primary Key is not unique"
    else:
        message = "Primary Key contains nulls"

    if message != "":
        raise Exception(message)


# COMMAND ----------

# Variables

# Destinations

# File location
schema_name = "YOUR_SCHEMA_NAME_HERE"

# Database location
dest_db_name = "YOUR_DEST_DATABASE_NAME_HERE"

# PK fields
pk_fields = ["EXAMPLE_ID", "EXAMPLE_LOCATION"]

# Metadata
etl_id = get_etl_id()
t = datetime.utcnow()

# Create metadata dataFrame
data = [(1, etl_id, t, t)]
columns = ["key", "ETL_ID", "CREATED_DATE", "UPDATED_DATE"]
meta_df = spark.createDataFrame(data, columns)
meta_df = meta_df.withColumn("ETL_ID", meta_df["ETL_ID"].cast("int"))

# COMMAND ----------

# Table name variable list
table_list = [
    {"table_name": "EXAMPLE_TABLE"}
]

# COMMAND ----------

# Iterate through table variables
for i in range(len(table_list)):

    table_name = table_list[i].get("table_name")

    # SQL query
    sql_query = spark.sql(
        f"""
        SELECT 1 AS EXAMPLE_ID,
        'TEXAS' AS "EXAMPLE_LOCATION"
        """
    )
    build_object(dest_db_name, schema_name, table_name, pk_fields, sql_query)

Comparing two tables for equality with Spark SQL

The best way of comparing two tables to determine if they are the exact same is to calculate the hash sum of each table and then compare the sum of hash. The benefit of the technique below are that no matter how many fields there are and no matter what data types the fields may be, you can use following query to do the comparison:

SELECT SUM(HASH(*)) FROM t1;
SELECT SUM(HASH(*)) FROM t2;

Of course if the schemas of the two tables are different this will by default produce different hash values.

How to insert a record with Spark SQL

INSERT INTO tables with VALUES option as achieved with other SQL variants is not supported in Spark SQL as of now. For single record inserts the below example provides two options:

--CREATE test table
CREATE TABLE TestSchema.InsertTest USING DELTA AS (SELECT 1 AS row_id, 'value1' AS field_1, 'value2' AS field_2)

--INSERT INTO test table
INSERT INTO TestSchema.InsertTest SELECT t.* FROM (SELECT 2, 'value3', 'value4') t;

--INSERT INTO test table while aliasing field names
INSERT INTO TestSchema.InsertTest SELECT t.* FROM (SELECT 3 AS row_id, 'value5' AS field_1, 'value6' AS field_2) t;

--Confirm insert
SELECT * FROM TestSchema.InsertTest

How to drop a Spark Delta table and associated files using Spark SQL and cmd

🧹 How to Drop a Spark Delta Table and Clean Up Associated Files in Databricks

When working with Delta Lake tables in Databricks, it’s not enough to simply drop the table from the metastore—you also need to ensure that the underlying data files are removed to prevent clutter and maintain a clean data lake. This process is especially important when dealing with external Delta tables, where Spark does not automatically manage file deletion.

The following steps outline a reliable method to fully remove a Delta table and its associated files using Spark SQL and command-line tools.

🔹 Step 1: Identify the Schema and Table

Begin by locating the schema and table you want to delete. Replace placeholder values like schemaName and tableName with the actual names used in your environment. This ensures you’re targeting the correct table throughout the process.

🔹 Step 2: Inspect the Table Metadata

Using Spark SQL within Databricks, run a query to describe the table. This will return detailed metadata, including the location of the table’s data files in DBFS (Databricks File System). If you’re using the default schema, it may be named default, but adjust as needed.

🔹 Step 3: Locate the Storage Path

In the metadata output, scroll down to find the Location field. This value points to the directory where the table’s data files are stored. Copy this path—it will be used later to manually delete the files if necessary.

🔹 Step 4: Drop the Table from the Metastore

Execute a Spark SQL command to drop the table. This removes the table’s metadata from the catalog. If the table is managed, this step may also delete the associated files. However, for external tables, the files will remain and must be deleted manually.

🔹 Step 5: Delete the Data Files from DBFS

Using your preferred method of interacting with DBFS—whether through the command line, a Python script, or a Databricks notebook—delete the directory identified earlier. This ensures that all data files associated with the table are removed from storage.

✅ Why This Matters

Delta tables support ACID transactions and maintain a transaction log. Improper deletion—such as manually removing files without dropping the table—can corrupt the log and lead to inconsistent behavior. By following this structured approach, you ensure both the metadata and physical files are properly cleaned up.

This method is especially useful when:

  • Decommissioning obsolete datasets
  • Resetting environments for testing
  • Automating cleanup in CI/CD pipelines

Let me know if you’d like help turning this into a reusable script or integrating it with your workflow.

#Step 1
#Find and replace schemaName
#Find and replace tableName

#Step 2 
#Find the table 
#Via Databricks run the Spark SQL query below
#default is schema, change as needed
DESC FORMATTED schemaName.tableName;

#Step 3
#From the table returned scroll down to "location" and copy the field value
#Find and replace locationFieldValue

#Step 5
#Via Databricks using Spark SQL drop the table
DROP TABLE tableName

#Step 6
#Find and replace locationFieldValue
#By the means you use to interact with Databricks File System (dbfs), e.g. cmd python virtual environment
#Run command below
dbfs rm -r "locationFieldValue"

How to dynamically pivot a SQL Server table using dynamic T-SQL

A dynamic pivot table means you do not need to define hard coded column names as a dynamic query will fetch the field values from a column and use them as the column names while pivoting the source table.

Sounds complicated?

It is!

Good thing there are some code examples below you can just steal and alter as you need.

The first example will just return as a SELECT, the second example will write the results to a global temp table called ##Result.

A use case for this might be a continuous requirement to pivot a table however the column name requirements keep changing as field values change.

Example 1: Return as SELECT

/*Mock Table*/
IF OBJECT_ID('tempdb.dbo.#Fruits', 'U') IS NOT NULL
	DROP TABLE #Fruits;

CREATE TABLE #Fruits (
	Fruit VARCHAR(255)
	,Quantity INT
	,DateOf DATETIME
	);

INSERT INTO #Fruits (
	Fruit
	,Quantity
	,DateOf
	)
VALUES 
('Apple', 10	,GETDATE())
,('Orange', 10	,GETDATE())
,('Banana', 10, GETDATE())
,('Apple', 11, DATEADD(DAY, - 1, GETDATE()))
,('Orange', 11, DATEADD(DAY, - 1, GETDATE()))
,('Banana', 11, DATEADD(DAY, - 1, GETDATE()))
,('Apple', 12, DATEADD(DAY, - 2, GETDATE()))
,('Orange', 12, DATEADD(DAY, - 2, GETDATE()))
,('Banana', 12, DATEADD(DAY, - 2, GETDATE()))
,('Apple', 13, DATEADD(DAY, - 3, GETDATE()))
,('Orange', 13, DATEADD(DAY, - 3, GETDATE()))
,('Banana', 13, DATEADD(DAY, - 3, GETDATE()));

/*Demo Mock table*/
SELECT *
FROM #Fruits;

/*Logic to dynamically pivot table*/
DECLARE @cols AS NVARCHAR(MAX)
	,@query AS NVARCHAR(MAX);

SELECT @cols = STUFF((
			SELECT DISTINCT QUOTENAME(f.[Fruit]) + ', '
			FROM #Fruits AS f
			FOR XML PATH('')
				,TYPE
			).value('.', 'NVARCHAR(MAX)'), 1, 1, '');

/*Add missing square bracket to start of string*/
SET @cols = '[' + @cols;
/*Remove last comma from string*/
SET @cols = SUBSTRING(@cols, 1, (LEN(@cols) - 1));
SET @query = 'SELECT [DateOf], ' + @cols + ' FROM 
             (
              SELECT *
			  FROM #Fruits
            ) x
            pivot 
            (
                min(Quantity)
                for [Fruit] in (' + @cols + ')
            ) p ORDER BY RIGHT([DateOf], 4) ASC
			,LEFT(RIGHT([DateOf], 7), 2) ASC
			,LEFT([DateOf], 2) ASC';

EXECUTE (@query);

DROP TABLE #Fruits;

Example 2: Write output to a table

IF OBJECT_ID('tempdb.dbo.##Result', 'U') IS NOT NULL
	DROP TABLE ##Result;
/*Mock Table*/
IF OBJECT_ID('tempdb.dbo.#Fruits', 'U') IS NOT NULL
	DROP TABLE #Fruits;

CREATE TABLE #Fruits (
	Fruit VARCHAR(255)
	,Quantity INT
	,DateOf DATETIME
	);

INSERT INTO #Fruits (
	Fruit
	,Quantity
	,DateOf
	)
VALUES 
('Apple', 10	,GETDATE())
,('Orange', 10	,GETDATE())
,('Banana', 10, GETDATE())
,('Apple', 11, DATEADD(DAY, - 1, GETDATE()))
,('Orange', 11, DATEADD(DAY, - 1, GETDATE()))
,('Banana', 11, DATEADD(DAY, - 1, GETDATE()))
,('Apple', 12, DATEADD(DAY, - 2, GETDATE()))
,('Orange', 12, DATEADD(DAY, - 2, GETDATE()))
,('Banana', 12, DATEADD(DAY, - 2, GETDATE()))
,('Apple', 13, DATEADD(DAY, - 3, GETDATE()))
,('Orange', 13, DATEADD(DAY, - 3, GETDATE()))
,('Banana', 13, DATEADD(DAY, - 3, GETDATE()));

/*Demo Mock table*/
SELECT *
FROM #Fruits;

/*Logic to dynamically pivot table*/
DECLARE @cols AS NVARCHAR(MAX)
	,@query AS NVARCHAR(MAX);

SELECT @cols = STUFF((
			SELECT DISTINCT QUOTENAME(f.[Fruit]) + ', '
			FROM #Fruits AS f
			FOR XML PATH('')
				,TYPE
			).value('.', 'NVARCHAR(MAX)'), 1, 1, '');

/*Add missing square bracket to start of string*/
SET @cols = '[' + @cols;
/*Remove last comma from string*/
SET @cols = SUBSTRING(@cols, 1, (LEN(@cols) - 1));
SET @query = 'SELECT [DateOf], ' + @cols + ' INTO ##Result FROM 
             (
              SELECT *
			  FROM #Fruits
            ) x
            pivot 
            (
                min(Quantity)
                for [Fruit] in (' + @cols + ')
            ) p ORDER BY RIGHT([DateOf], 4) ASC
			,LEFT(RIGHT([DateOf], 7), 2) ASC
			,LEFT([DateOf], 2) ASC';

EXECUTE (@query);

SELECT * FROM ##Result;

DROP TABLE ##Result;

DROP TABLE #Fruits;

How to create a Spark SQL table with a SELECT statement

The following is a code snippet that would create a table in a “sales” schema called customer.

If no reference to a schema is given the table will be created in the default Spark location.

CREATE TABLE sales.customer USING DELTA AS (SELECT 'John' AS fn, 'Smith' AS sn, 55 AS age)

How to check SQL Server schema user permissions

The script below will, providing the login you are using has adequate permissions, return the schema permissions a user has.

Commented out at the end of the script are examples of the types of permission you can assign, again providing the login you are using has adequate permissions.

SELECT ClassDescription
	,StateDescription
	,PermissionName
	,SchemaName
	,UserName
FROM (
	SELECT class_desc AS ClassDescription
		,state_desc AS StateDescription
		,permission_name AS PermissionName
		,SCHEMA_NAME(major_id) AS SchemaName
		,USER_NAME(grantee_principal_id) AS UserName
	FROM sys.database_permissions AS PERM
	JOIN sys.database_principals AS Prin ON PERM.major_ID = Prin.principal_id
		AND class_desc = 'SCHEMA'
	) AS schemaPermissions
WHERE 1=1 
/*Uncomment below to check permissions on a specific schema and/or specific user*/
--	AND SchemaName = 'dbo'
--	AND UserName = 'SomeGuy'
ORDER BY UserName ASC
,SchemaName ASC
GO



/*
--Grant schema permission examples
GRANT SELECT ON SCHEMA::dbo TO SomeGuy;
GRANT UPDATE ON SCHEMA::dbo TO SomeGuy;
GRANT ALTER ON SCHEMA::dbo TO SomeGuy;
GRANT DELETE ON SCHEMA::dbo TO SomeGuy;
*/

How to write T-SQL Geography data to a table

Below is some example code for writing the SQL Server geography data type to a table. Note by default geography data is stored in a binary format but it can be converted to a string to make it human readable.

Note: Pass in Longitude and Latitude values in that order.

/*Demo of geo data*/
DECLARE @g GEOGRAPHY;

SET @g = GEOGRAPHY::STPointFromText('POINT(53.578741 -6.611670)', 4326);

/*Geography data is in binary format*/
SELECT @g AS 'GeoBinaryFormat';

/*Convert binary data to a string*/
SELECT @g.ToString() AS 'ConvertingDataToString';


/*Inserting geo data into Table*/
CREATE TABLE #GeoTest ([CoordinateLocation] [geography] NULL);

INSERT INTO #GeoTest (CoordinateLocation)
SELECT GEOGRAPHY::STPointFromText('POINT(53.578741 -6.611670)', 4326);

SELECT *
FROM #GeoTest;

DROP TABLE #GeoTest;

How to sum time with T-SQL

Time cannot be summed directly in T-SQL. In order to sum two times they first need to be assigned a date. When a time data type is cast as a datetime data type, as it does not have a date element, the value defaults to the date of 1900-01-01.

As T-SQL does have the functionality to sum datetime and as the date element will be the same only the time value will be summed. This functionality allows us to sum time.

Below is example T-SQL:

IF OBJECT_ID('tempdb..#TimeTable', 'U') IS NOT NULL
BEGIN
DROP TABLE #TimeTable
END

CREATE TABLE #TimeTable(
	id INT
	,TimeRecord TIME(0)
	);

INSERT INTO #TimeTable
VALUES (
	1
	,'00:00:10'
	);

INSERT INTO #TimeTable
VALUES (
	1
	,'00:14:00'
	);

INSERT INTO #TimeTable
VALUES (
	2
	,'00:00:10'
	);

INSERT INTO #TimeTable
VALUES (
	2
	,'00:35:10'
	);

SELECT id
,TimeRecord
FROM #TimeTable;

/*demo of time converted to datetime*/
SELECT CAST(TimeRecord AS DATETIME) AS DateTimeRecord
FROM #TimeTable

SELECT id
	,CAST(DATEADD(MILLISECOND, SUM(DATEDIFF(MILLISECOND, 0, CAST(TimeRecord AS DATETIME))), 0) AS TIME(0)) AS SummedTime
FROM #TimeTable
GROUP BY id;