4 Examples to Take Your PySpark Skills to Next Level

Author:Murphy  |  View: 22641  |  Time: 2025-03-22 23:04:49
Photo by fabio on Unsplash

PySpark is the Python API for Spark, which is an analytics engine used for large-scale data processing. Spark has become the predominant tool in the Data Science ecosystem especially when we deal with large datasets that are difficult to handle with tools like Pandas and SQL.

In this article, we'll learn PySpark but from a different perspective than most of the other tutorials. Instead of going over frequently used PySpark functions and explaining how to use them, we'll solve some challenging data cleaning and processing tasks. This way of learning not only helps us learn PySpark functions but also know when to use them.

Before we start with the examples, let me tell you how to get the dataset used in the examples. It's a sample dataset I prepared with mock data. You can download from my datasets repository – it's called "sample_sales_pyspark.csv".

Let's start with creating a DataFrame from this dataset.

Python">from pyspark.sql import SparkSession
from pyspark.sql import Window, functions as F

Spark = SparkSession.builder.getOrCreate()

data = spark.read.csv("sample_sales_pyspark.csv", header=True)

data.show(5)
# output
+----------+------------+----------+---------+---------+-----+
|store_code|product_code|sales_date|sales_qty|sales_rev|price|
+----------+------------+----------+---------+---------+-----+
|        B1|       89912|2021-05-01|       14|    17654| 1261|
|        B1|       89912|2021-05-02|       19|    24282| 1278|
|        B1|       89912|2021-05-03|       15|    19305| 1287|
|        B1|       89912|2021-05-04|       21|    28287| 1347|
|        B1|       89912|2021-05-05|        4|     5404| 1351|
+----------+------------+----------+---------+---------+-----+

Example 1: Select expressions

PySpark allows for using SQL code through its pyspark.sql module. It's highly practical and intuitive to use SQL code for some data preprocessing tasks such as changing column names and data types.

The selectExpr function makes it very simple to do these operations especially if you have some experience with SQL.

Consider we need only some of the columns from this dataset. While creating this sub-dataframe, we also want to change the names of the store_code , product_code , and sales_date columns. The data types of the product_code and sales_date columns are string so let's also change them when handling column renaming.

All of these operations can be done with a select expression as follows:

sub_df = data.selectExpr("store_code as store_id",
                         "cast(product_code as int) as product_id",
                         "cast(sales_date as date) as date",
                         "cast(sales_qty as int)")

sub_df.dtypes
# output
[('store_id', 'string'),
 ('product_id', 'int'),
 ('date', 'date'),
 ('sales_qty', 'int')]

sub_df.show(5)
# output
+--------+----------+----------+---------+
|store_id|product_id|      date|sales_qty|
+--------+----------+----------+---------+
|      B1|     89912|2021-05-01|       14|
|      B1|     89912|2021-05-02|       19|
|      B1|     89912|2021-05-03|       15|
|      B1|     89912|2021-05-04|       21|
|      B1|     89912|2021-05-05|        4|
+--------+----------+----------+---------+

Example 2: Moving average

Moving average is calculated by creating a window of a predefined size and applying the mean function over it. The drawing below illustrates how this operation is done.

Moving average of 3 (image by author)

Moving averages can be useful in predictive analytics especially when we work with time series data. Let's calculate the moving average of a 3-day window. We'll define a window partitioned by store_id and product_id columns since the sales quantities of different products at different stores are unrelated and should not be mixed.

# define window
window = (
    Window
    .partitionBy("store_id", "product_id")
    .orderBy("date")
    .rowsBetween(-2, Window.currentRow)
)

# calculate mean over the window
sub_df = (
    sub_df
    .withColumn("moving_avg", F.round(F.mean("sales_qty").over(window), 2))
)

To customize the window size, we use the rowsBetween method. The index of the current row is 0, the previous one is -1, and so on. I previously wrote a detailed article about PySpark window operations if you'd like to learn more about them.

Let's take a look at the moving average values in our DataFrame.

sub_df.show(5)

# output
+--------+----------+----------+---------+----------+
|store_id|product_id|      date|sales_qty|moving_avg|
+--------+----------+----------+---------+----------+
|      A1|     89686|2021-05-06|       12|      12.0|
|      A1|     89686|2021-05-07|       23|      17.5|
|      A1|     89686|2021-05-08|       14|     16.33|
|      A1|     89686|2021-05-09|        8|      15.0|
|      A1|     89686|2021-05-10|        6|      9.33|
+--------+----------+----------+---------+----------+

The first moving average value is the same as the sales quantity because there are not any other values to be aggregated. The second moving average is the average of the first two sales quantity values, which are 12 and 13. We start to see actual moving averages of the 3-day window from the third row.


Example 3: Moving average with SQL

Just like we used SQL for selecting, renaming, and casting columns, we can use it for calculating moving averages. This is a highly convenient feature of Spark for those who have very good SQL skills but need to adapt to Spark.

We simply write what we want to do as SQL code and pass it to the expr function.

# define window
expression = """
mean(sales_qty) over (partition by store_id, product_id order by date 
rows between 2 preceding and current row)
"""

sub_df = (
    sub_df
    .withColumn("moving_avg", F.round(F.expr(expression), 2))
)

We don't need to define a window object here as it's handled in the SQL code.


Example 4: Conditional moving average

We have promotions on some days, which are marked by a column called is_promo . Let's create it first.

promo_days = ["2021-05-07", 
              "2021-05-08", 
              "2021-05-09"]

sub_df = (
    sub_df
    .withColumn("is_promo", 
                F.when(F.col("date").isin(promo_days), 1).otherwise(0))
)

sub_df.show(10)
# output
+--------+----------+----------+---------+----------+--------+
|store_id|product_id|      date|sales_qty|moving_avg|is_promo|
+--------+----------+----------+---------+----------+--------+
|      A1|     89686|2021-05-06|       12|      12.0|       0|
|      A1|     89686|2021-05-07|       23|      17.5|       1|
|      A1|     89686|2021-05-08|       14|     16.33|       1|
|      A1|     89686|2021-05-09|        8|      15.0|       1|
|      A1|     89686|2021-05-10|        6|      9.33|       0|
|      A1|     89686|2021-05-11|        4|       6.0|       0|
|      A1|     89686|2021-05-12|       11|       7.0|       0|
|      A1|     89686|2021-05-13|        9|       8.0|       0|
|      A1|     89686|2021-05-14|       19|      13.0|       0|
|      A1|     89686|2021-05-15|        3|     10.33|       0|
+--------+----------+----------+---------+----------+--------+

We use the when function to create conditional columns. The condition is specified within this function along with values to be used for the rows that fit the condition. The otherwise function is used for assigning value to rows that do not fit any of the given conditions.

What we want to do now is to calculate the moving average of a 3-day window but excluding the promo days. If one of the previous days within the 3-day window range is a promotion day, it'll be skipped and won't be included in the calculation of moving average. It'll be more clear when we do the example.

There are different ways of doing this but my approach will be to remove the promo days from the DataFrame and then calculate the moving average. Then, I'll join the new calculation to the original DataFrame. The days with promotion will be Null (i.e. missing values). I'll use the previous moving average value to fill these Null values.

window = (
    Window
    .partitionBy("store_id", "product_id")
    .orderBy("date")
    .rowsBetween(-2, Window.currentRow)
)

# filter out promo days and calculate moving average
nopromo_df = (
    sub_df
    .filter("is_promo == 0")
    .withColumn("moving_avg_nopromo", F.round(F.mean("sales_qty").over(window), 2))
    .select("store_id", "product_id", "date", "moving_avg_nopromo")
)

# join nopromo moving average to the original DataFrame
sub_df = (
    sub_df
    .join(nopromo_df, on=["store_id", "product_id", "date"], how="left")
)

sub_df.filter(F.col("store_id")=="B1").show(10)
# output
+--------+----------+----------+---------+----------+--------+------------------+
|store_id|product_id|      date|sales_qty|moving_avg|is_promo|moving_avg_nopromo|
+--------+----------+----------+---------+----------+--------+------------------+
|      B1|     89912|2021-05-01|       14|      14.0|       0|              14.0|
|      B1|     89912|2021-05-02|       19|      16.5|       0|              16.5|
|      B1|     89912|2021-05-03|       15|      16.0|       0|              16.0|
|      B1|     89912|2021-05-04|       21|     18.33|       0|             18.33|
|      B1|     89912|2021-05-05|        4|     13.33|       0|             13.33|
|      B1|     89912|2021-05-06|        5|      10.0|       0|              10.0|
|      B1|     89912|2021-05-07|       10|      6.33|       1|              NULL|
|      B1|     89912|2021-05-08|       18|      11.0|       1|              NULL|
|      B1|     89912|2021-05-09|        5|      11.0|       1|              NULL|
|      B1|     89912|2021-05-10|        2|      8.33|       0|              3.67|
+--------+----------+----------+---------+----------+--------+------------------+

Let's take a look at the last row in the output above. The moving_average is 8.33, which is the average of 2, 5, and 18. The moving_average_nopromo is 3.67, which is the average of 2, 5, and 4. The values on dates 2021–05–07, 2021–05–08, and 2021–05–09 are not included in this calculation because there is a promotion on these days.

The final step is to handle the Null values. We can use the forward fill strategy, which means using the previous non-missing value to replace the missing values.

window = (
    Window
    .partitionBy("store_id", "product_id")
    .orderBy("date")
)

sub_df = (
    sub_df
    .withColumn("moving_avg_nopromo", 
                F.last("moving_avg_nopromo", ignorenulls=True).over(window))
)

sub_df.filter(F.col("store_id")=="B1").show(10)
# output
+--------+----------+----------+---------+----------+--------+------------------+
|store_id|product_id|      date|sales_qty|moving_avg|is_promo|moving_avg_nopromo|
+--------+----------+----------+---------+----------+--------+------------------+
|      B1|     89912|2021-05-01|       14|      14.0|       0|              14.0|
|      B1|     89912|2021-05-02|       19|      16.5|       0|              16.5|
|      B1|     89912|2021-05-03|       15|      16.0|       0|              16.0|
|      B1|     89912|2021-05-04|       21|     18.33|       0|             18.33|
|      B1|     89912|2021-05-05|        4|     13.33|       0|             13.33|
|      B1|     89912|2021-05-06|        5|      10.0|       0|              10.0|
|      B1|     89912|2021-05-07|       10|      6.33|       1|              10.0|
|      B1|     89912|2021-05-08|       18|      11.0|       1|              10.0|
|      B1|     89912|2021-05-09|        5|      11.0|       1|              10.0|
|      B1|     89912|2021-05-10|        2|      8.33|       0|              3.67|
+--------+----------+----------+---------+----------+--------+------------------+

We defined a new window without row boundaries. This is required to be able to use the last function. Then, we apply the last function to select the previous value to fill the missing values.


Final words

We covered 5 examples to solve a particular task with PySpark. They'll help you not only learn how to do things with PySpark but also do them efficiently or in a simpler way. Considering most of the time in building a production-level data-based system is spent on cleaning and preprocessing data, PySpark is a highly valuable tool for all kinds of practitioners in the data science ecosystem.

Thank you for reading. Please let me know if you have any feedback.

Tags: Data Engineering Data Science Programming Python Spark

Comment