How to Store Historical Data Much More Efficiently

Author:Murphy  |  View: 20726  |  Time: 2025-03-23 12:45:11
Photo by Supratik Deshmukh on Unsplash

In an era where companies and organizations are collecting more data than ever before, datasets tend to accumulate millions of unnecessary rows that don't contain any new or valuable information. In this article, we'll focus on a critical aspect of data management: deleting rows in a dataset if they provide no added value, using PySpark*.

*PySpark is used over pandas when dealing with very large datasets because it can process data across multiple computers, making it faster and more scalable. Pandas works well for smaller datasets that can fit in memory on a single machine but may become slow or even impractical for big data.

Let's imagine the following situation: you work as a data engineer/scientist in the maintenance department of a real estate company. For the past ten years, your company has done a full load of all maintenance data from an external database containing the conditions of your buildings and stored it in the company's cloud storage. The data could e.g. look like this:

[image by Author]

Three columns are present in this dataset:

  • id -> for the ID of the building.
  • condition -> an integer between 1 (terrible) and 10 (excellent) that represents the condition of the building.
  • import_date -> a datetime column representing the day this row was imported from the external software.

To create this dataset yourself please run the snippet below:

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, DateType
import random

# set the seed to get same results when rerunning
random.seed(42)

# create a spark session
spark = SparkSession.builder.getOrCreate()

# create id list
ids = list(range(1, 11))  # Adjust this list size to meet the desired number of unique IDs

# create two possible conditions for each id
conditions = [[random.randint(1, 10) for _ in range(2)] for _ in ids]

# create a list of tuples where each tuple is a row
rows = [(id, random.choice(conditions[id-1]), date) 
        for id in ids for date in ['2023-01-01', '2023-01-02', '2023-01-03']]

# some ids should be missing on some dates, so we remove some
rows = [row for row in rows if random.random() > 0.2]  # Adjust this parameter to control the number of missing IDs

# create a DataFrame
df = spark.createDataFrame(rows, ["id", "condition", "import_date"])

# convert import_date column to DateType
df = df.withColumn("import_date", df["import_date"].cast(DateType()))

# sort DataFrame
df = df.sort("id", "import_date")

# show the DataFrame
df.show()

Let's take a look at Building2:

[image by Author]

On the first and second of January, the condition of the building is 4. On the third of January, the condition changed to 5. We don't actually need the middle row about the second of January. If we just look at which date a condition changes, we could potentially omit many, many rows. The first occurrence, in this dataset, of Building2 is on the first of January, and the condition changes on the third of January. We can simply omit the second row about the second of January by adding a from and until column which provides information on when the row's values were encountered first and when one of the row's values changed.

This artificial dataset is very small and contains only three different import dates. But let's consider a more realistic situation. Your real-estate company owns thousands of buildings, and the imports span over 10 years. This would result in many million rows without actual information. In my own real-life situation working for a housing corporation, using this technique, we only had to save about ~0.2% of all rows to store all information.

Let's take a look at the code to add the from and until columns.

First of all, we have to specify the partition columns and the datetime column for the table. The partition column is similar to the column you'd like to group by when you're using pandas.

partition_columns = ["id"]
datetime_column = "import_date"

Then we need to specify the columns on which we want to track any possible changes, in our case, that's condition. However, to make the code more general and reusable we can specify that the columns we want to track the changes of are every column except the partition columns and the datetime column.

track_columns = [column for column in df.columns 
                  if column not in partition_columns + [datetime_column]]

Now we need to create a PySpark Window, which acts similarly to pandas' .groupby(). In that Window, we need to specify that we want to sort the rows on thedatetime_column:

from pyspark.sql import Window

window = Window.partitionBy(partition_columns).orderBy(datetime_column)

To check whether the value of one of the track_columns has changed, we can add the value of the previous row to the current row, and check whether they're identical.

for column in track_columns:
  previous_column = f"previous_{column}"
  changed_column = f"changed_{column}"
  df = df.withColumn(previous_column, f.lag(f.col(column)).over(window))
  df = df.withColumn(changed_column, f.col(column) != f.col(previous_column))
  # df = df.drop(previous_column)

Running this code leads to the following DataFrame:

[image by Author]

Here we can see, that based on whether previous_condition is equal to condition we can determine whether a value has changed, and we store this in changed_condition. Because of the specified Window, the values of previous_condition are only determined within the reach of each unique id.

However, we cannot only keep the rows when there's a change in values because that would result in deleting all the first occurrences of a row for a given id. We also have to save all the first rows explicitly.

To do this, we can use row_number()*.

  • In this situation, you could also determine the first row by looking at the null values in previous_condition and/or changed_condition, however, for illustration purposes, we'll use row_number() here.
df = df.withColumn('row_num', f.row_number().over(window))
[image by Author]

Now there are two conditions for when we would like to keep a row:

  • When changed_condition is True in any of the f"changed_{column}" columns.
  • When row_num == 1

However, what if a row gets deleted? Because there is no next row, changed_condition is False, and row_num != 1.

Therefore, we need to make sure to check whether an id is still present in the last import.

Let's first determine what the last import date is:

# determine last import date
latest_import = df.agg(f.max(datetime_column)).collect()[0][0]

print(f"{latest_import = }")

# output
latest_import = datetime.date(2023, 1, 3)

Unfamiliar with the syntax print(f"{variable = }")? Read the article below for this and other useful Python tricks!

5 Python Tricks That Distinguish Senior Developers From Juniors

Now let's determine the latest import date of an id:

# Add last_partition_import_date to keep track of 
#  what the last imported date was per partition
last_partition_window = Window.partitionBy(*partition_columns)
df = (
  df
  .withColumn('last_partition_import_date', 
              f.max(f.col(datetime_column)).over(last_partition_window)
)
[image by Author]

In the image above you can see by looking at the last_partition_import_date that Building3 has disappeared from the data set on the third of January. This could e.g. mean that the building has been demolished. This would be very important information, which is why it is important to keep track of whether an id has been deleted or not.

Let's now filter out all the rows that are irrelevant to keep. As we mentioned before, the rows we want to keep should have a changed condition value, or it should be the first occurrence of a building in the data set.

from itertools import reduce

# Add condition to keep every first row 
first_row_condition = f.col("row_num") == 1

# Add condition that only keeps row that have changed values
change_condition = reduce(lambda a, b: a | b, (f.col(f"changed_{column}") 
                    for column in track_columns))

# Filter the DataFrame to keep the first rows for each partition 
# and the rows where there is a change
filtered_df = df.filter(first_row_condition)
              .union(df.filter(change_condition))
[Image by Author]

Now, we have a DataFrame in which each row adds information! Let's remove some columns that we don't need anymore:

# Drop the 'row_num' and 'changed_*' columns
filtered_df = filtered_df.drop('row_num', 
                    *[f"previous_{column}" for column in track_columns],
                    *[f"changed_{column}" for column in track_columns])

Also, let's add an until column which indicates until when the row's values are valid. We can do this by shifting, the next row's import_date one up using f.lag, as long as the next row's id is still the same and as long as the rows are ordered by import_date. For that, we'll reuse the earlier defined Window:

window = Window.partitionBy(partition_columns).orderBy(datetime_column)

# Add 'until' by looking one row ahead
filtered_df = filtered_df.withColumn("until", 
                              f.lag(f.col(datetime_column), -1).over(window))
[Image by Author]

As we can see e.g. for Building1 the until value of the first row is the import_date of the second row. The until value of the second row is the import_date of the third row. The until value of the third row is null because there is no fourth row.

We're almost done! As mentioned above, we still see some null values in the until column. This is not problematic for Building1, but it is problematic for Building3 because we can't use the until column to detect that Building3 is no longer present on the third of January. This is where we'll make use of the last_partition_import_date and the latest_import value.

If until is null and the last_partition_import_date < latest_import, then we add the last_partition_import_date plus one day to the until column of that row.

# Add a deletion date if the last date of a partition is before the last import date of the data
filtered_df = filtered_df.withColumn(
  "until",
  f.when(
      (f.col("until").isNull()) 
       & (f.col("last_partition_import_date") < latest_import),
      f.date_add(f.col("last_partition_import_date"), 1)
  ).otherwise(f.col("until"))
)
[Image by Author]

Using this technique, we can see that Building3's last seen day was on the second of January (we use a non-inclusive until here). If a row still has a null value in the until column, it means that that row is still valid.

The last thing we need to do is to drop the last_partition_import_date column and rename the import_date column to from:

final_df = (
  filtered_df
  .drop("last_partition_import_date")
  .withColumnRenamed(datetime_column, "from")
)
[image by Author]

Here above you can see our final DataFrame! In real life, this technique can save you millions of rows that don't carry any information, which saves money on storage and makes further processing a lot faster!

To conclude

In this article, we've learned how you can save millions of rows by only retaining the rows that actually carry information, by looking at changed values within each subset. To summarize what we did:

  1. Grouped the DataFrame into partitions based on their IDs.
  2. For each partition, we determined the consecutive identical combination of values. Consecutive identical combinations of values are dropped.
  3. We added from and until as new columns to the rows to determine the lifespan of a combination of consecutive values.
  4. We checked whether the last until value of a partition is before the last until value of the entire DataFrame to add an until value for deleted partitions.

To make the code above reusable I've created a function for it which you can copy-paste into your own project and use there!

import pyspark.sql.functions as f
from functools import reduce
from pyspark.sql import Window
import pyspark
from typing import Optional

def remove_uninformative_rows(df: pyspark.sql.DataFrame,
                              partition_columns: list[str],
                              datetime_column: str,
                              track_columns: Optional[list[str]] = None)
                             -> pyspark.sql.DataFrame:

  if track_columns is None:
    track_columns = [column for column in df.columns 
                    if column not in partition_columns + [datetime_column]]

  # Define a window specification based on partition columns and ordered by datetime_column
  window = Window.partitionBy(*partition_columns).orderBy(datetime_column)

  # Iterate over the non-partition columns and add a new column for each column that indicates whether the value has changed
  for column in track_columns:
      previous_column = f"previous_{column}"
      changed_column = f"changed_{column}"
      df = df.withColumn(previous_column, f.lag(f.col(column)).over(window))
      df = df.withColumn(changed_column, f.col(column) != f.col(previous_column))
      df = df.drop(previous_column)

  # Add the row number to keep track of what the first occurance of a row is
  df = df.withColumn('row_num', f.row_number().over(window))

  # Add last_partition_import_date to keep track of what the last imported date was per partition
  last_partition_window = Window.partitionBy(*partition_columns)
  df = df.withColumn('last_partition_import_date', f.max(f.col(datetime_column)).over(last_partition_window))

  # Save the latest import date of the entire data set
  latest_import = df.agg(f.max(datetime_column)).collect()[0][0]

  # Add condition to keep every first row 
  first_row_condition = f.col("row_num") == 1

  # Add condition that only keeps row that have changed values
  change_condition = reduce(lambda a, b: a | b, (f.col(f"changed_{column}") for column in track_columns))

  # Filter the DataFrame to keep the first rows for each partition and the rows where there is a change
  filtered_df = df.filter(first_row_condition).union(df.filter(change_condition))

  # Drop the 'row_num' and 'changed_*' columns
  filtered_df = filtered_df.drop('row_num', *[f"changed_{column}" for column in track_columns])

  # Add 'until' column by looking one row ahead
  filtered_df = filtered_df.withColumn("until", f.lag(f.col(datetime_column), -1).over(window))

  # Add a deletion date if the last date of a partition is before the last import date of the data
  filtered_df = filtered_df.withColumn(
    "until",
    f.when(
        (f.col("until").isNull()) & (f.col("last_partition_import_date") < latest_import),
        f.date_add(f.col("last_partition_import_date"), 1)
    ).otherwise(f.col("until"))
  )

  # Drop last_partition_import_date and rename import_date column
  final_df = (
    filtered_df
    .drop("last_partition_import_date")
    .withColumnRenamed("import_date", "from")
  )
  return final_df

With this function, you can easily remove all uninformative rows!

remove_uninformative_rows(
  df=df,
  partition_columns=['id'],
  datetime_column='import_date'
)

If you found this article informative, please take a look at my account to see more articles on Python and Data Science!

How to extract more information from categorical plots

Tags: Data Engineering Data Science Programming Software Engineering Technology

Comment