How to Store Historical Data Much More Efficiently

In an era where companies and organizations are collecting more data than ever before, datasets tend to accumulate millions of unnecessary rows that don't contain any new or valuable information. In this article, we'll focus on a critical aspect of data management: deleting rows in a dataset if they provide no added value, using PySpark*.
*PySpark is used over pandas when dealing with very large datasets because it can process data across multiple computers, making it faster and more scalable. Pandas works well for smaller datasets that can fit in memory on a single machine but may become slow or even impractical for big data.
Let's imagine the following situation: you work as a data engineer/scientist in the maintenance department of a real estate company. For the past ten years, your company has done a full load of all maintenance data from an external database containing the conditions of your buildings and stored it in the company's cloud storage. The data could e.g. look like this:

Three columns are present in this dataset:
id
-> for the ID of the building.condition
-> an integer between 1 (terrible) and 10 (excellent) that represents the condition of the building.import_date
-> a datetime column representing the day this row was imported from the external software.
To create this dataset yourself please run the snippet below:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, DateType
import random
# set the seed to get same results when rerunning
random.seed(42)
# create a spark session
spark = SparkSession.builder.getOrCreate()
# create id list
ids = list(range(1, 11)) # Adjust this list size to meet the desired number of unique IDs
# create two possible conditions for each id
conditions = [[random.randint(1, 10) for _ in range(2)] for _ in ids]
# create a list of tuples where each tuple is a row
rows = [(id, random.choice(conditions[id-1]), date)
for id in ids for date in ['2023-01-01', '2023-01-02', '2023-01-03']]
# some ids should be missing on some dates, so we remove some
rows = [row for row in rows if random.random() > 0.2] # Adjust this parameter to control the number of missing IDs
# create a DataFrame
df = spark.createDataFrame(rows, ["id", "condition", "import_date"])
# convert import_date column to DateType
df = df.withColumn("import_date", df["import_date"].cast(DateType()))
# sort DataFrame
df = df.sort("id", "import_date")
# show the DataFrame
df.show()
Let's take a look at Building2:

On the first and second of January, the condition of the building is 4. On the third of January, the condition changed to 5. We don't actually need the middle row about the second of January. If we just look at which date a condition changes, we could potentially omit many, many rows. The first occurrence, in this dataset, of Building2 is on the first of January, and the condition changes on the third of January. We can simply omit the second row about the second of January by adding a from
and until
column which provides information on when the row's values were encountered first and when one of the row's values changed.
This artificial dataset is very small and contains only three different import dates. But let's consider a more realistic situation. Your real-estate company owns thousands of buildings, and the imports span over 10 years. This would result in many million rows without actual information. In my own real-life situation working for a housing corporation, using this technique, we only had to save about ~0.2% of all rows to store all information.
Let's take a look at the code to add the from
and until
columns.
First of all, we have to specify the partition columns and the datetime column for the table. The partition column is similar to the column you'd like to group by when you're using pandas.
partition_columns = ["id"]
datetime_column = "import_date"
Then we need to specify the columns on which we want to track any possible changes, in our case, that's condition
. However, to make the code more general and reusable we can specify that the columns we want to track the changes of are every column except the partition columns and the datetime column.
track_columns = [column for column in df.columns
if column not in partition_columns + [datetime_column]]
Now we need to create a PySpark Window, which acts similarly to pandas' .groupby()
. In that Window, we need to specify that we want to sort the rows on thedatetime_column
:
from pyspark.sql import Window
window = Window.partitionBy(partition_columns).orderBy(datetime_column)
To check whether the value of one of the track_columns
has changed, we can add the value of the previous row to the current row, and check whether they're identical.
for column in track_columns:
previous_column = f"previous_{column}"
changed_column = f"changed_{column}"
df = df.withColumn(previous_column, f.lag(f.col(column)).over(window))
df = df.withColumn(changed_column, f.col(column) != f.col(previous_column))
# df = df.drop(previous_column)
Running this code leads to the following DataFrame:

Here we can see, that based on whether previous_condition
is equal to condition
we can determine whether a value has changed, and we store this in changed_condition
. Because of the specified Window
, the values of previous_condition
are only determined within the reach of each unique id
.
However, we cannot only keep the rows when there's a change in values because that would result in deleting all the first occurrences of a row for a given id
. We also have to save all the first rows explicitly.
To do this, we can use row_number()*
.
- In this situation, you could also determine the first row by looking at the null values in
previous_condition
and/orchanged_condition
, however, for illustration purposes, we'll userow_number()
here.
df = df.withColumn('row_num', f.row_number().over(window))

Now there are two conditions for when we would like to keep a row:
- When
changed_condition is True
in any of thef"changed_{column}"
columns. - When
row_num == 1
However, what if a row gets deleted? Because there is no next row, changed_condition is False
, and row_num != 1
.
Therefore, we need to make sure to check whether an id
is still present in the last import.
Let's first determine what the last import date is:
# determine last import date
latest_import = df.agg(f.max(datetime_column)).collect()[0][0]
print(f"{latest_import = }")
# output
latest_import = datetime.date(2023, 1, 3)
Unfamiliar with the syntax
print(f"{variable = }")
? Read the article below for this and other useful Python tricks!5 Python Tricks That Distinguish Senior Developers From Juniors
Now let's determine the latest import date of an id
:
# Add last_partition_import_date to keep track of
# what the last imported date was per partition
last_partition_window = Window.partitionBy(*partition_columns)
df = (
df
.withColumn('last_partition_import_date',
f.max(f.col(datetime_column)).over(last_partition_window)
)

In the image above you can see by looking at the last_partition_import_date
that Building3 has disappeared from the data set on the third of January. This could e.g. mean that the building has been demolished. This would be very important information, which is why it is important to keep track of whether an id
has been deleted or not.
Let's now filter out all the rows that are irrelevant to keep. As we mentioned before, the rows we want to keep should have a changed condition
value, or it should be the first occurrence of a building in the data set.
from itertools import reduce
# Add condition to keep every first row
first_row_condition = f.col("row_num") == 1
# Add condition that only keeps row that have changed values
change_condition = reduce(lambda a, b: a | b, (f.col(f"changed_{column}")
for column in track_columns))
# Filter the DataFrame to keep the first rows for each partition
# and the rows where there is a change
filtered_df = df.filter(first_row_condition)
.union(df.filter(change_condition))

Now, we have a DataFrame in which each row adds information! Let's remove some columns that we don't need anymore:
# Drop the 'row_num' and 'changed_*' columns
filtered_df = filtered_df.drop('row_num',
*[f"previous_{column}" for column in track_columns],
*[f"changed_{column}" for column in track_columns])
Also, let's add an until
column which indicates until when the row's values are valid. We can do this by shifting, the next row's import_date
one up using f.lag
, as long as the next row's id
is still the same and as long as the rows are ordered by import_date
. For that, we'll reuse the earlier defined Window:
window = Window.partitionBy(partition_columns).orderBy(datetime_column)
# Add 'until' by looking one row ahead
filtered_df = filtered_df.withColumn("until",
f.lag(f.col(datetime_column), -1).over(window))

As we can see e.g. for Building1 the until
value of the first row is the import_date
of the second row. The until
value of the second row is the import_date
of the third row. The until
value of the third row is null
because there is no fourth row.
We're almost done! As mentioned above, we still see some null
values in the until
column. This is not problematic for Building1, but it is problematic for Building3 because we can't use the until
column to detect that Building3 is no longer present on the third of January. This is where we'll make use of the last_partition_import_date
and the latest_import
value.
If until
is null
and the last_partition_import_date < latest_import
, then we add the last_partition_import_date
plus one day to the until
column of that row.
# Add a deletion date if the last date of a partition is before the last import date of the data
filtered_df = filtered_df.withColumn(
"until",
f.when(
(f.col("until").isNull())
& (f.col("last_partition_import_date") < latest_import),
f.date_add(f.col("last_partition_import_date"), 1)
).otherwise(f.col("until"))
)

Using this technique, we can see that Building3's last seen day was on the second of January (we use a non-inclusive until here). If a row still has a null
value in the until
column, it means that that row is still valid.
The last thing we need to do is to drop the last_partition_import_date
column and rename the import_date
column to from
:
final_df = (
filtered_df
.drop("last_partition_import_date")
.withColumnRenamed(datetime_column, "from")
)

Here above you can see our final DataFrame! In real life, this technique can save you millions of rows that don't carry any information, which saves money on storage and makes further processing a lot faster!
To conclude
In this article, we've learned how you can save millions of rows by only retaining the rows that actually carry information, by looking at changed values within each subset. To summarize what we did:
- Grouped the DataFrame into partitions based on their IDs.
- For each partition, we determined the consecutive identical combination of values. Consecutive identical combinations of values are dropped.
- We added
from
anduntil
as new columns to the rows to determine the lifespan of a combination of consecutive values. - We checked whether the last
until
value of a partition is before the lastuntil
value of the entire DataFrame to add anuntil
value for deleted partitions.
To make the code above reusable I've created a function for it which you can copy-paste into your own project and use there!
import pyspark.sql.functions as f
from functools import reduce
from pyspark.sql import Window
import pyspark
from typing import Optional
def remove_uninformative_rows(df: pyspark.sql.DataFrame,
partition_columns: list[str],
datetime_column: str,
track_columns: Optional[list[str]] = None)
-> pyspark.sql.DataFrame:
if track_columns is None:
track_columns = [column for column in df.columns
if column not in partition_columns + [datetime_column]]
# Define a window specification based on partition columns and ordered by datetime_column
window = Window.partitionBy(*partition_columns).orderBy(datetime_column)
# Iterate over the non-partition columns and add a new column for each column that indicates whether the value has changed
for column in track_columns:
previous_column = f"previous_{column}"
changed_column = f"changed_{column}"
df = df.withColumn(previous_column, f.lag(f.col(column)).over(window))
df = df.withColumn(changed_column, f.col(column) != f.col(previous_column))
df = df.drop(previous_column)
# Add the row number to keep track of what the first occurance of a row is
df = df.withColumn('row_num', f.row_number().over(window))
# Add last_partition_import_date to keep track of what the last imported date was per partition
last_partition_window = Window.partitionBy(*partition_columns)
df = df.withColumn('last_partition_import_date', f.max(f.col(datetime_column)).over(last_partition_window))
# Save the latest import date of the entire data set
latest_import = df.agg(f.max(datetime_column)).collect()[0][0]
# Add condition to keep every first row
first_row_condition = f.col("row_num") == 1
# Add condition that only keeps row that have changed values
change_condition = reduce(lambda a, b: a | b, (f.col(f"changed_{column}") for column in track_columns))
# Filter the DataFrame to keep the first rows for each partition and the rows where there is a change
filtered_df = df.filter(first_row_condition).union(df.filter(change_condition))
# Drop the 'row_num' and 'changed_*' columns
filtered_df = filtered_df.drop('row_num', *[f"changed_{column}" for column in track_columns])
# Add 'until' column by looking one row ahead
filtered_df = filtered_df.withColumn("until", f.lag(f.col(datetime_column), -1).over(window))
# Add a deletion date if the last date of a partition is before the last import date of the data
filtered_df = filtered_df.withColumn(
"until",
f.when(
(f.col("until").isNull()) & (f.col("last_partition_import_date") < latest_import),
f.date_add(f.col("last_partition_import_date"), 1)
).otherwise(f.col("until"))
)
# Drop last_partition_import_date and rename import_date column
final_df = (
filtered_df
.drop("last_partition_import_date")
.withColumnRenamed("import_date", "from")
)
return final_df
With this function, you can easily remove all uninformative rows!
remove_uninformative_rows(
df=df,
partition_columns=['id'],
datetime_column='import_date'
)
If you found this article informative, please take a look at my account to see more articles on Python and Data Science!