PySpark Explained: Delta Tables

Author:Murphy  |  View: 24471  |  Time: 2025-03-23 11:56:36

Delta tables are the key components of a Delta Lake, an open-source storage layer that brings ACID (Atomicity, Consistency, Isolation, Durability) transactions to big data workloads.

The concept and implementation of Delta tables ( and by association – Delta Lakes ) were done by the team at Databricks, the company that created Spark.

Databricks is now a cloud-based platform for data engineering, Machine Learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. Delta tables are a key component of that environment.

Coming from an AWS background, Delta tables somewhat remind me of AWS's Athena service, which enables you to perform SQL SELECT operations on files held on S3, the AWS mass storage service.

There is one key difference though. Athena is designed to be a query-only tool, whereas Delta tables allow you to UPDATE, DELETE and INSERT data records easily as well as query data from them. In this respect, Delta tables act more like Apache Iceberg formatted tables. But the advantage they have over Iceberg tables is that they are more tightly integrated with the Spark eco-system.

Some key features of Delta tables and lakes include,

  • ACID Transactions

Delta tables ensure that all data operations (insert, update, delete) adhere to ACID principles – atomicity, consistency, isolation, and durability. This guarantees reliable data pipelines and prevents common issues such as data corruption or partial updates.

  • Scalable Metadata Handling

Delta Lake employs advanced metadata management to ensure that operations on Delta tables scale effectively with the data, offering faster access and efficient data management.

  • Schema Enforcement and Evolution

Delta tables enforce schema on write, ensuring data quality and consistency. They also support schema evolution, allowing users to add new columns and make other changes without disrupting existing data pipelines.

  • Time Travel

Delta tables support time travel, allowing users to access and revert to previous versions of the data. This feature is useful for debugging, auditing, correcting accidental deletions, and reproducing experiments.

  • Batch and Streaming

Delta Lake supports both batch and streaming data processing, enabling users to build applications that seamlessly integrate historical and real-time data.

  • Data Upserts and Deletes

Delta tables support upserts (merge operations) and deletes, which are crucial for handling slowly changing dimensions (SCD) and other common data warehousing scenarios.

  • Efficient Data Compaction

Delta Lake optimizes storage by automatically compacting small files into larger ones, improving read performance and reducing storage costs.

  • Underlying Storage Format

Delta Lake utilizes Parquet as the underlying storage format for its tables. When data is written to a Delta table, it is stored in Parquet format along with Delta-specific transaction logs and metadata, enabling Delta Lake's advanced features.

  • Reading from Various Formats

You can read data from various formats (CSV, JSON, etc.) into a DataFrame in PySpark. After loading the data into a DataFrame, you can write this DataFrame to a Delta table, which will store the data in Parquet format.

To me, in simplistic terms, a delta lake can be treated much like a relational Database Management System(RDBMS). It's comprised of multiple delta tables that are updatable storage, backed by parquet format data files and JSON change logs which mimic individual RDBMS tables.


Accessing a FREE PySpark development environment

The rest of this article will feature quite a lot of PySpark and SQL code, so if you want to follow along, you'll need access to a PySpark development environment with an installation of Delta to enable you to create and perform operations on Delta tables. Delta tables are pre-built into the Spark ecosystem on Databricks by default, so that's what I'll use to demonstrate them.

If you're lucky enough to have access to PySpark & Delta either through your work, via the cloud, or a local install, go ahead and use that. Otherwise, please have a look at the link below, where I go into detail about how you can access a great FREE online PySpark development environment called the Databricks Community Edition.

Databricks is a cloud-based platform for data engineering, machine learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.

How to access a FREE online Spark development environment


For the rest of this article, I'll assume you're running PySpark on the Databricks community edition, and I'll show how you can perform CRUD actions on Delta tables using PySpark in that development environment. However, the code should work on any system that has both PySpark and Delta installed on it.

Example data

The sample input dataset I'll use is a CSV containing information about cities around the world. It lives on my local PC and looks like this,

City,Country,Latitude,Longitude,Population
Tokyo,Japan,35.6895,139.69171,38001000
Delhi,India,28.66667,77.21667,25703000
Shanghai,China,31.22,121.46,23741000
São Paulo,Brazil,-23.55,-46.6396,12330000
Mumbai,India,19.0760,72.880838,21043000

For Databricks to use this data, I need to load it up from my PC to the Databricks File System (DBFS).

But we can't do anything useful until we have a Spark compute cluster set up, so let's do that first.

Start by clicking on the compute menu option on the left-hand menu block of the Databricks home screen. Now click on the Create Compute button at the top right-hand corner of the displayed page. It will take a minute or two until the cluster is fully up and running.

Note, for the Databricks Community edition, the cluster shuts down after 60 minutes of inactivity, so if you leave your desk and come back, the cluster may have terminated. If so, create a new one, but remember, any tables you created on the old cluster that are not ‘managed‘ will disappear on cluster termination. The meanings of managed and unmanaged tables are explained later.

Once the cluster is up and running, we can create and attach a PySpark Notebook to it, so we can start our coding. To do that, you should click on the New-> Notebookmenu option on the left-hand menu block of the Databricks home screen.

A Jupyter-like notebook will appear, and it should be automatically attached to the cluster you just created.

Going back to uploading our local file to DBFS, there are a couple of ways to do this, but for me, the most natural way is to use the File->Upload data to DBFS menu from the Notebook. So do that now and choose and make a note of where you decide to put the file on DBFS. For me, the location I chose was:- /FileStore/tables/delta-test/data/cities.csv

So, we now have everything in place for us to start coding. Let's go.

Understanding databases in Spark

In Spark, databases are namespaces that allow you to organize and manage tables logically. When you create a table without specifying a database, Spark automatically places the table in its default database. The default database is a built-in database that comes with every Spark catalog.

Here's how you can create a new database and start populating it with delta tables.

Python"># Define the database name
database_name = "my_database"

# Create the database if it does not exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Define the table name
table_name = "cities_delta"

# Construct the full table name by combining database and table name
full_table_name = f"{database_name}.{table_name}"

# Write the DataFrame to a Delta table in the specified database
df.write.format("delta").option("delta.columnMapping.mode", "name").mode("overwrite").saveAsTable(full_table_name)

In all our examples from now on we'll use the Spark catalog `default` database.

Example PySpark Code

  • Create a DataFrame from our input data file on DBFS
# File location and type
file_location = "/FileStore/tables/delta-test/data/cities.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, 
# these will be ignored.
#
df = spark.read.format(file_type) 
  .option("inferSchema", infer_schema) 
  .option("header", first_row_is_header) 
  .option("sep", delimiter) 
  .load(file_location)

display(df)

|------------|---------|-----------|------------|------------|
| City       | Country | Latitude  | Longitude  | Population |
|------------|---------|-----------|------------|------------|
| Tokyo      | Japan   | 35.6895   | 139.69171  | 38001000   |
| Delhi      | India   | 28.66667  | 77.21667   | 25703000   |
| Shanghai   | China   | 31.22     | 121.46     | 23741000   |
| São Paulo  | Brazil  | -23.55    | -46.6396   | 12330000   |
| Mumbai     | India   | 19.0760   | 72.880838  | 21043000   |
|------------|---------|-----------|------------|------------|
  • Create a Delta table from this DataFrame
# Write the DataFrame to a Delta table

delta_table_path = "/FileStore/tables/delta-test/tables/cities_delta"

df.write.format("delta").option("delta.columnMapping.mode", "name").mode("overwrite").save(delta_table_path)

Note that the above table is created as unmanaged, also referred to as `external.` What does this mean? If you drop an unmanaged table, the data files that make up the table are not deleted, whereas they are for managed tables. To create a managed table, change the line above …

df.write.format("delta").option("delta.columnMapping.mode", "name").mode("overwrite").save(delta_table_path)

to something like,

df.write.format("delta").option("delta.columnMapping.mode", "name").mode("overwrite").saveAsTable("cities")

To check if a table is managed or unmanaged (external) use the describe extended PySpark SQL command.

%sql

describe extended cities_delta

+------------------+-----------------------------------------------------+
| col_name         | data_type                                           |
+------------------+-----------------------------------------------------+
| City             | string                                              |
| Country          | string                                              |
| Latitude         | double                                              |
| Longitude        | double                                              |
| Population       | int                                                 |
| Catalog          | spark_catalog                                       |
| Database         | default                                             |
| Table            | cities                                              |
| Created Time     | Thu Jun 27 11:15:56 UTC 2024                        |
| Last Access      | UNKNOWN                                             |
| Created By       | Spark 3.3.2                                         |
| Type             | EXTERNAL                                            |
| Location         | dbfs:/FileStore/tables/delta-test/tables/cities_delta|
| Provider         | delta                                               |
| Owner            | root                                                |
| Table Properties | [delta.minReaderVersion=1, delta.minWriterVersion=2]|
+------------------+-----------------------------------------------------+

In the Type row, it clearly shows that the table is EXTERNAL (another name for unmanaged).

The above example also demonstrates another useful property of Notebooks in Databricks. By using different %magiccommands at the beginning of a cell we can indicate to Databricks that we want to run commands other than PySpark code. Here are some examples, check the docs for a full list,

+------------+--------------------------------------------------------+
|Magic       | Meaning                                                |
+------------+--------------------------------------------------------+  
|%sql        | Run a regular SQL statement, e.g select * from mytable |
|%python     | Run Python code, e.g print("Hello World")              |
|%scala      | Run Scala code,  e.g println("Hello World")            |
|%sh         | Run a shell command, e,g pwd                           |
+------------+--------------------------------------------------------+
  • Register the table with the Hive Metastore

Managed tables are automatically registered with Hive, so this only applies to unmanaged tables.

Notice that after the original table creation PySpark ran, you wouldn't have seen your new table in the Databricks data catalog. This is because there is an additional step you must take to register the table with the Hive metastore so Databricks "knows" where to find it.

Image by Author
spark.sql("""
CREATE TABLE IF NOT EXISTS cities
USING DELTA
LOCATION '/FileStore/tables/delta-test/tables/cities_delta'
""")

Now it's there.

Image by Author
  • Selecting from the Delta table

Using SparkSQL

spark.sql("SELECT * FROM cities where Country='India'").collect()

Out[11]: [Row(City='Delhi',  Country='India',  Latitude=28.66667, Longitude=77.21667, Population=25703000),
 Row(City='Mumbai',  Country='India',  Latitude=19.076, Longitude=72.880838, Population=21043000)]

Using the %sql magic

%sql
select * from cities where city = 'Mumbai'

+-----------+-------+--------+---------+----------+
|       City|Country|Latitude|Longitude|Population|
+-----------+-------+--------+---------+----------+
|     Mumbai|  India| 19.076 |72.880838|  21043000|
|     Delhi |  India| 28.6667| 77.21667|  25703000|
+-----------+-------+--------+---------+----------+

Or the DataFrame API

from pyspark.sql.functions import col

# Define the Delta table path
delta_table_path = "/FileStore/tables/delta-test/tables/cities_delta"

# Read the Delta table into a DataFrame
df = spark.read.format("delta").load(delta_table_path)

# Filter the DataFrame for records where the country is India
df_filtered = df.filter(col("Country") == "India")

# Collect the filtered results
results = df_filtered.collect()

# Display the results
for row in results:
    print(row)

Row(City='Delhi',  Country='India',  Latitude=28.66667, Longitude=77.21667, Population=25703000)
Row(City='Mumbai',  Country='India',  Latitude=19.076, Longitude=72.880838, Population=21043000)
  • Inserting new data

We can do this by inserting data from a temporary view or another Spark table. Here is an example of the former.

new_data = [
    Row(City="Los Angeles", Country="USA", Latitude=34.0522, Longitude=-118.2437, Population=4000000),
    Row(City="London", Country="UK", Latitude=51.5074, Longitude=-0.1278, Population=9000000)
]

new_df = spark.createDataFrame(new_data)

new_df.createOrReplaceTempView("new_cities")

# Insert new data into the Delta table using SQL
spark.sql("""
    INSERT INTO cities
    SELECT * FROM new_cities
""")

df=spark.sql("SELECT * FROM cities")
df.show()

+-----------+--------+---------+---------+----------+
|       City| Country| Latitude|Longitude|Population|
+-----------+--------+---------+---------+----------+
|      Tokyo|   Japan|  35.6895|139.69171|  38001000|
|      Delhi|   India| 28.66667| 77.21667|  25703000|
|   Shanghai|   China|    31.22|   121.46|  23741000|
|  São Paulo|  Brazil|   -23.55| -46.6396|  12330000|
|     Mumbai|   India|   19.076|72.880838|  21043000|
|Los Angeles|     USA|  34.0522|-118.2437|   4000000|
|     London|      UK|  51.5074|  -0.1278|   9000000|
+-----------+--------+---------+---------+----------+
  • Data deletion and updating

These are straightforward SQL operations,

Example 1, delete the two records we previously inserted.

spark.sql("""
    DELETE FROM cities
    WHERE City  IN ('Los Angeles', 'London')
""")

df=spark.sql("SELECT * FROM cities")
df.show()

+---------+--------+---------+---------+----------+
|     City| Country| Latitude|Longitude|Population|
+---------+--------+---------+---------+----------+
|    Tokyo|   Japan|  35.6895|139.69171|  38001000|
|    Delhi|   India| 28.66667| 77.21667|  25703000|
| Shanghai|   China|    31.22|   121.46|  23741000|
|São Paulo|  Brazil|   -23.55| -46.6396|  12330000|
|   Mumbai|   India|   19.076|72.880838|  21043000|
+---------+--------+---------+---------+----------+

Example 2, update the populations of the two Indian cities to 10000000

# Update the populations of Delhi and Mumbai to 10,000,000
spark.sql("""
UPDATE cities
SET Population = 10000000
WHERE City IN ('Delhi', 'Mumbai')
""")

df=spark.sql("SELECT * FROM cities")
df.show()

+---------+--------+---------+---------+----------+
|     City| Country| Latitude|Longitude|Population|
+---------+--------+---------+---------+----------+
|    Tokyo|   Japan|  35.6895|139.69171|  38001000|
|    Delhi|   India| 28.66667| 77.21667|  10000000|
| Shanghai|   China|    31.22|   121.46|  23741000|
|São Paulo|  Brazil|   -23.55| -46.6396|  12330000|
|   Mumbai|   India|   19.076|72.880838|  10000000|
+---------+--------+---------+---------+----------+
  • Data merging

As well as regular CRUD operations on Delta tables, we can also use the SQL `Merge` command on them. Recall, merge allows you to decide what happens to records in a target table when it's joined to a source table on one or more common fields. You can opt to update or delete records on the target table when the join column(s) match, or insert new records when they don't.

In this example, our source data will be a temporary view built from the data below, and we'll use that to perform an Update, Delete and Insert on our cities delta table.

from pyspark.sql import Row

# Insert Los Angeles record
# Update Mumbai record, set population=5000000
# Delete Tokyo record
#
new_data = [
    Row(City="Los Angeles", Country="USA", Latitude=34.0522, Longitude=-118.2437, Population=4000000,DeleteFlag=None),
    Row(City="Mumbai", Country="India", Latitude=51.5074, Longitude=-0.1278, Population=5000000,DeleteFlag=None),
    Row(City="Tokyo", Country="Japan",Latitude=None,Longitude=None,Population=None,DeleteFlag='Y')

]

new_df = spark.createDataFrame(new_data)

# create new_cities temp view
# This will be the source of the data
# we want to merge into the cities delta table
#

new_df.createOrReplaceTempView("new_cities")

new_df.show()

+-----------+-------+--------+---------+----------+----------+
|       City|Country|Latitude|Longitude|Population|DeleteFlag|
+-----------+-------+--------+---------+----------+----------+
|Los Angeles|    USA| 34.0522|-118.2437|   4000000|      null|
|     Mumbai|  India| 51.5074|  -0.1278|   5000000|      null|
|      Tokyo|  Japan|    null|     null|      null|         Y|
+-----------+-------+--------+---------+----------+----------+

We can use the %sqlmagic command to run SQL right in our notebook cell but you can use PySpark SQL to do the same.

%sql

MERGE INTO cities AS old
USING new_cities AS new
ON old.City = new.City and old.Country = new.Country
WHEN MATCHED AND new.DeleteFlag = 'Y' THEN
  DELETE
WHEN MATCHED THEN
  UPDATE SET old.population = new.population
WHEN NOT MATCHED THEN
  INSERT (City,Country,Latitude,Longitude,Population) VALUES (new.City,new.Country,new.Latitude,new.Longitude,new.Population)

Let's see the final result

df= spark.sql("select * from cities").show()

+-----------+-------+--------+---------+----------+
|       City|Country|Latitude|Longitude|Population|
+-----------+-------+--------+---------+----------+
|      Delhi|  India|28.66667| 77.21667|  25703000|
|   Shanghai|  China|   31.22|   121.46|  23741000|
|  São Paulo| Brazil|  -23.55| -46.6396|  12330000|
|Los Angeles|    USA| 34.0522|-118.2437|   4000000|
|     Mumbai|  India|  19.076|72.880838|   5000000|
+-----------+-------+--------+---------+----------+

Delta table performance considerations

Unlike traditional RDBMS tables, Delta tables do not support regular indexes (e.g., B-tree indexes). However, some optimizations are provided that can help improve query performance.

  1. Z-Ordering (Multi-dimensional Clustering)

Z-ordering is an optimization technique that maps multidimensional data to a linear order. It helps to improve the performance of range queries and spatial queries by clustering related information together. This technique is particularly useful when your queries filter on multiple columns.

# Optimize the Delta table by Z-Ordering on the specified columns

spark.sql("""
OPTIMIZE cities
ZORDER BY (Country, City)
""")
  1. Partitioning

Partitioning splits your Delta table into smaller, manageable pieces based on the values of one or more columns. This can greatly improve query performance for queries that filter on the partition columns.

When creating a Delta table, you can specify partition columns like this:

# Write the DataFrame to a partitioned Delta table

delta_table_path='/FileStore/tables/delta-test/tables/cities_delta'

df.write.format("delta").mode("overwrite").partitionBy("Country").save(delta_table_path)

#
# Do the same using SQL

-- Create a partitioned Delta table
CREATE TABLE cities_delta_partitioned (
    City STRING,
    Country STRING,
    Latitude DOUBLE,
    Longitude DOUBLE,
    Population INT
)
USING DELTA
PARTITIONED BY (Country)
LOCATION '/FileStore/tables/delta-test/tables/cities_delta_partitioned'
  1. Data Skipping

Delta Tables automatically collect statistics on their data (such as min/max values for each column) and use these statistics to skip reading files that do not match the query predicate. This is known as data skipping.

One straightforward way to improve the performance of individual delta tables is to use the SQL optimize command. Delta Lake automatically generates small files when data is written in small batches. Over time, these small files can accumulate and degrade performance. The optimize command compacts these small files into larger files, which are more efficient to read. Optimize is used on a per-table basis. For example,

df =spark.sql("""
OPTIMIZE cities
""")

Summary

This article serves as an introduction to using Delta tables in PySpark, highlighting their significance and utility in constructing Delta Lakes. Delta Lakes are essentially collections of Delta tables, which are backed by Parquet data files and enriched with catalog information to support updates and other ACID transactions. This combination allows Delta tables to bring some of the robust functionalities of traditional relational databases (RDBMS) to big data mass storage environments, enabling efficient data management on large-scale file systems.

Their support for ACID transactions, which are fundamental for maintaining data integrity in data lakes, makes delta tables particularly useful for building data pipelines that require robust data handling capabilities, such as dealing with late-arriving data, deduplication, and schema enforcement.

In this article, I demonstrated how to create Delta tables, providing step-by-step instructions and practical examples. The CRUD (Create, Read, Update, Delete) operations on Delta tables were illustrated to showcase their versatility and ease of use. These operations are essential for managing and manipulating data effectively in a Delta Lake environment.

Furthermore, we explored optimization techniques to enhance the performance of Delta tables. This includes methods like Z-ordering, which improves query performance by clustering related information and optimizing data storage to manage small files and improve read efficiency.

To summarize, Delta tables in PySpark offer a powerful framework for managing large-scale data with the reliability and consistency akin to RDBMS, but with the flexibility and scalability required for big data solutions. They are indispensable for anyone looking to leverage the full potential of data lakes, providing a structured and efficient way to handle large volumes of data.

_OK, that's all for me just now. I hope you found this article useful. If you did, please check out my profile page at this link. From there, you can see my other published stories, follow me or subscribe to get notified when I post new content._

If you liked this content, I think you'll find these articles interesting too.

PySpark Explained: Dealing with Invalid Records When Reading CSV and JSON Files

An introduction to Fabric: The best AI tool you've never heard of?

Tags: Data Science Machine Learning Programming Python Technology

Comment