PySpark Explained: User-Defined Functions

Author:Murphy  |  View: 23627  |  Time: 2025-03-22 20:48:45

This article is about User Defined Functions (UDFs) in Spark. I'll go through what they are and how you use them, and show you how to implement them using examples written in PySpark.

Incidentally, when I talk about PySpark, I just mean that the underlying language being used when programming with Spark is Python. The OG language for development using Spark was Scala, but with Python's meteoric rise in popularity, it's now the main language people use when programming in Spark even though Spark itself is written in Scala.

What is Spark?

If you haven't used or heard of Spark before, the TL;DR is that it is a powerful tool for processing and analysing large amounts of data quickly. It's a distributed computing engine, designed to handle big data tasks by breaking them into smaller pieces and working on them in parallel. This makes it much faster and more efficient than many other methods, especially for complex tasks like data analysis, machine learning, and real-time data processing.

Now part of the Apache Software Federation, Spark has several key aspects that cater to different aspects of data processing and analysis, including components for Machine Learning, SQL operations and handling Streaming data.

What are UDFs?

Spark's extensive ecosystem includes numerous built-in functions for various data processing needs. However, there are scenarios where these built-in functions fall short, and that's when UDFs become invaluable.

In simple terms, UDFs are a way to extend the functionality of Spark SQL and DataFrame operations. They are custom functions written in PySpark or Spark/Scala and enable you to apply complex transformations and business logic that Spark does not natively support.

In PySpark, UDFs broadly speaking come in two different "flavours".

1/ Regular or standard UDFs

Standard UDFs can also be roughly subdivided into:-

Scalar UDFs

  • These operate on individual values and return a single value for each row of a DataFrame.

Aggregate UDFs

  • These are similar to scalar UDFs but process multiple rows collectively, returning a single result per group.

2/ Pandas UDFs

  • These perform row-wise operations efficiently using vectorized operations and Apache Arrow on Pandas series or DataFrames
  • They can also be subdivided depending on what they are processing, i.e. a Pandas series or whole Pandas DataFrame and what they return.

I'll show examples of using both main types of UDFs.

Implementing and using a UDF in PySpark is all about following a few simple steps.

  1. Write the UDF function using Python
  2. Register the UDF with PySpark.
  3. Call your UDF.

Accessing a Spark development environment

We're going to look at some examples of coding using UDFs, but before that, we need a Spark development environment where we can develop and run our programs to make sure they work

If you're lucky enough to have access to Spark either through your work, via the cloud or a local install, go ahead and use that. If not, please have a look at the link below, where I go into detail about how you can access a great FREE online SPARK development environment called the Databricks Community Edition.

Databricks is a cloud-based platform for Data Engineering, machine learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.

How to access a FREE online Spark development environment

Example 1 – A standard scalar UDF

In this example, we're going to take a set of temperature readings in degrees Fahrenheit and convert them via a UDF to degrees Celsius. This is a trivial example and not one you'd use in real life, but the steps shown here will apply to any UDF you'll want to implement.

Create the function that will be a UDF

Create a Python function to convert Fahrenheit to Celsius. For example,

def fahrenheit_to_celsius(fahrenheit):
    return (fahrenheit - 32) * 5.0 / 9.0

Register the function as a UDF

Depending on the type of UDF, there are different ways to register it so that Pyspark can recognise and use it.

For a standard UDF that will be used in PySpark SQL, we use the `spark.udf.register` directive, like this:-


Spark.udf.register("fahrenheit_to_celsius", fahrenheit_to_celsius, DoubleType())

It takes three parameters as follows,

1/ UDF Function label

When you register the UDF with a label, you can refer to this label in SQL queries. In the example, "fahrenheit_to_celcius" is the label used to call the UDF in the SQL statement. This allows for consistent use of the UDF across multiple SQL queries.

2/ UDF function name

The function you pass here is the logic that will be executed for each row. This function should be designed to handle the input types you expect and return the type specified by the return type.

3/ UDF Return type

Specifying the return type helps Spark understand the schema of the resulting DataFrame when the UDF is applied. This is crucial for optimization and ensuring that the data types are handled correctly throughout Spark's processing pipeline. Examples of common return types include IntegerType(), StringType(), DoubleType(), etc.


If the standard UDF is to be used in row-wise DataFrame operations, we can use the simple udf directive to register it. It's often used as a function decorator but can be used stand-alone too.

This takes two parameters.

1/ UDF function name (if not being used as a function decorator)

2/ UDF Return type


Pandas UDFs are registered by using the @pandas_udf function.

This is typically used as a function decorator but can also be used as a stand-alone function and can take three parameters.

1/ UDF function name (if not being used as a function decorator)

2/ The UDF return type

The data type of the value(s) returned by the Pandas UDF. This helps Spark optimize and understand how to handle the UDF's output.

3/ The UDF function type

This describes the kind of Pandas UDF being defined and specifies how the UDF will operate on the data. Common types include:

  • PandasUDFType.SCALAR: Operates on pandas.Series and returns a pandas.Series of the same length.
  • PandasUDFType.GROUPED_MAP: Operates on pandas.DataFrame and returns a pandas.DataFrame. Typically used with groupBy().apply().
  • PandasUDFType.GROUPED_AGG: Operates on pandas.Series and returns a scalar value, used with groupBy().agg().

You'll see the use of all three different UDF registration methods in my example code snippets.

Using the UDF

Here is our full PySpark code showing the scalar UDF in use. After some initialisation code, I create a DataFrame with some sample temperature data in Fahrenheit. Next, I create a temporary SQL table based on the DataFrame, then use my UDF to convert the data in the table to Celsius.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Temperature Conversion SQL") 
    .getOrCreate()

# Define the Python function
def fahrenheit_to_celsius(fahrenheit):
    return (fahrenheit - 32) * 5.0 / 9.0

# Register the function as a UDF
spark.udf.register("fahrenheit_to_celsius", fahrenheit_to_celsius, DoubleType())

# Create a sample DataFrame
data = [(32.0,), (212.0,), (98.6,), (77.0,)]
df = spark.createDataFrame(data, ["Fahrenheit"])
df.show()

# Create a temporary view
df.createOrReplaceTempView("temperature")

# Use the UDF in a SQL query
result = spark.sql("SELECT Fahrenheit, fahrenheit_to_celsius(Fahrenheit) AS Celsius FROM temperature")
result.show()

On my system, the above code created the following output.

df:pyspark.sql.dataframe.DataFrame = [Fahrenheit: double]
result:pyspark.sql.dataframe.DataFrame = [Fahrenheit: double, Celsius: double]
+----------+
|Fahrenheit|
+----------+
|      32.0|
|     212.0|
|      98.6|
|      77.0|
+----------+

+----------+-------+
|Fahrenheit|Celsius|
+----------+-------+
|      32.0|    0.0|
|     212.0|  100.0|
|      98.6|   37.0|
|      77.0|   25.0|
+----------+-------+

Example 2 – A standard scalar UDF

In this example, we'll take customer information including their debt levels and classify their risk in terms of their debt amount.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Customer Debt Classification") 
    .getOrCreate()

# Define the Python UDF function
def classify_debt(debt):
    if debt < 1000:
        return 'Low'
    elif 1000 <= debt < 5000:
        return 'Medium'
    elif 5000 <= debt < 10000:
        return 'High'
    else:
        return 'Very High'

# create the function as a UDF
classify_debt_udf = udf(classify_debt, StringType())

# Create a sample DataFrame
data = [
    (1, 500.0),
    (2, 1500.0),
    (3, 7000.0),
    (4, 2000.0),
    (5, 12000.0)
]
df = spark.createDataFrame(data, ["CustomerID", "Debt"])
df.show()

# Apply the UDF to classify debt amounts
df_with_classification = df.withColumn("RiskScore", classify_debt_udf(df["Debt"]))
df_with_classification.show()

df:pyspark.sql.dataframe.DataFrame = [CustomerID: long, Debt: double]
df_with_classification:pyspark.sql.dataframe.DataFrame = [CustomerID: long, Debt: double ... 1 more field]
+----------+-------+
|CustomerID|   Debt|
+----------+-------+
|         1|  500.0|
|         2| 1500.0|
|         3| 7000.0|
|         4| 2000.0|
|         5|12000.0|
+----------+-------+

+----------+-------+---------+
|CustomerID|   Debt|RiskScore|
+----------+-------+---------+
|         1|  500.0|      Low|
|         2| 1500.0|   Medium|
|         3| 7000.0|     High|
|         4| 2000.0|   Medium|
|         5|12000.0|Very High|
+----------+-------+---------+

Example 3 – A standard aggregate UDF

For this example, we'll use a list of tuples to create a synthetic data set of categories and their respective values. There are multiple values per category, so we'll create a UDF to calculate the mean value per category.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,collect_list
from pyspark.sql.types import DoubleType

# Sample data
data = [("A", 1.0), ("A", 2.0), ("A", 3.0), ("B", 4.0), ("B", 5.0)]
df = spark.createDataFrame(data, ["category", "value"])

# Define the Python UDF
def average(values):
    return float(sum(values)) / len(values)

# Register the UDF with Spark
average_udf = udf(average, DoubleType())

# Group by category and apply the UDF
df_grouped = df.groupBy("category").agg(average_udf(collect_list(df["value"])).alias("average_value"))

# Show the results
df_grouped.show()

+--------+-------------+
|category|average_value|
+--------+-------------+
|       A|          2.0|
|       B|          4.5|
+--------+-------------+

Example 4 – A Pandas UDF

This example demonstrates using a vectorized UDF to calculate a rolling median of the daily prices of some products.

Note that I'm using the @pandas_udf(...) decorator before the function to indicate it's a UDF.

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
import pandas as pd

# Create SparkSession
spark = SparkSession.builder.appName("PandasUDFExample").getOrCreate()

# Create a sample dataframe with stock prices
data = [
    ("PRODUCT_1", "2023-01-01", 130.5),
    ("PRODUCT_1", "2023-01-02", 131.0),
    ("PRODUCT_1", "2023-01-03", 130.0),
    ("PRODUCT_1", "2023-01-04", 132.5),
    ("PRODUCT_1", "2023-01-05", 133.0),
    ("PRODUCT_1", "2023-01-06", 132.0),
    ("PRODUCT_1", "2023-01-07", 134.5),
    ("PRODUCT_1", "2023-01-08", 135.0),
    ("PRODUCT_1", "2023-01-09", 136.0),
    ("PRODUCT_1", "2023-01-10", 135.5),
    ("PRODUCT_2", "2023-01-01", 85.0),
    ("PRODUCT_2", "2023-01-02", 86.5),
    ("PRODUCT_2", "2023-01-03", 87.0),
    ("PRODUCT_2", "2023-01-04", 86.0),
    ("PRODUCT_2", "2023-01-05", 88.5),
    ("PRODUCT_2", "2023-01-06", 89.0),
    ("PRODUCT_2", "2023-01-07", 88.5),
    ("PRODUCT_2", "2023-01-08", 90.0),
    ("PRODUCT_2", "2023-01-09", 91.5),
    ("PRODUCT_2", "2023-01-10", 92.0),
]

schema = StructType([
    StructField("productl", StringType(), True),
    StructField("date", StringType(), True),
    StructField("price", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)
df = df.withColumn("date", to_date(col("date")))

# Define the output schema
output_schema = StructType([
    StructField("product", StringType(), True),
    StructField("date", DateType(), True),
    StructField("price", DoubleType(), True),
    StructField("rolling_median", DoubleType(), True)
])

# Define the Pandas UDF
@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
def rolling_median(pdf):
    pdf = pdf.sort_values('date')
    pdf['rolling_median'] = pdf['price'].rolling(window=7, min_periods=1).median()
    return pdf

# Apply the UDF
result_df = df.groupBy("symbol").apply(rolling_median)

# Show the result
result_df.orderBy("symbol", "date").show()

+----------+----------+-----+---------------+
|   symbol |      date|price|rolling_median |
+----------+----------+-----+---------------+
|PRODUCT_1 |2023-01-01|130.5|         130.5 |
|PRODUCT_1 |2023-01-02|131.0|        130.75 |
|PRODUCT_1 |2023-01-03|130.0|         130.5 |
|PRODUCT_1 |2023-01-04|132.5|        130.75 |
|PRODUCT_1 |2023-01-05|133.0|         131.0 |
|PRODUCT_1 |2023-01-06|132.0|         131.5 |
|PRODUCT_1 |2023-01-07|134.5|         132.0 |
|PRODUCT_1 |2023-01-08|135.0|         132.5 |
|PRODUCT_1 |2023-01-09|136.0|         133.0 |
|PRODUCT_1 |2023-01-10|135.5|         134.5 |
|PRODUCT_2 |2023-01-01| 85.0|          85.0 |
|PRODUCT_2 |2023-01-02| 86.5|         85.75 |
|PRODUCT_2 |2023-01-03| 87.0|          86.5 |
|PRODUCT_2 |2023-01-04| 86.0|         86.25 |
|PRODUCT_2 |2023-01-05| 88.5|          86.5 |
|PRODUCT_2 |2023-01-06| 89.0|         86.75 |
|PRODUCT_2 |2023-01-07| 88.5|          87.0 |
|PRODUCT_2 |2023-01-08| 90.0|          88.5 |
|PRODUCT_2 |2023-01-09| 91.5|          88.5 |
|PRODUCT_2 |2023-01-10| 92.0|          89.0 |
+----------+----------+-----+---------------+

Example 5 – A Pandas UDF

Let's create a vectorized UDF that calculates the Haversine distance between two sets of latitude and longitude points. This example demonstrates more complex geospatial calculations using a vectorized UDF. The Haversine distance is the shortest distance between 2 locations on the surface of a sphere.

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np

# Sample data
data = [
    (1, 34.0522, -118.2437, 36.1699, -115.1398),  # LA to Vegas
    (2, 40.7128, -74.0060, 34.0522, -118.2437),   # NYC to LA
    (3, 37.7749, -122.4194, 34.0522, -118.2437)   # SF to LA
]

df = spark.createDataFrame(data, ["id", "lat1", "lon1", "lat2", "lon2"])

# Define the vectorized UDF for Haversine distance calculation
@pandas_udf(DoubleType())
def haversine_distance(lat1: pd.Series, lon1: pd.Series, lat2: pd.Series, lon2: pd.Series) -> pd.Series:
    R = 6371  # Earth radius in kilometers

    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))

    return R * c

# Apply the UDF to calculate the Haversine distance
df_with_distance = df.withColumn("distance_km", haversine_distance(df["lat1"], df["lon1"], df["lat2"], df["lon2"]))

# Show the results
df_with_distance.show(truncate=False)

+---+-------------+-----------+------------------+
|id |city1        |city2      |distance_km       |
+---+-------------+-----------+------------------+
|1  |Los Angeles  |Las Vegas  |367.60632235303024|
|2  |New York City|Los Angeles|3935.746254609723 |
|3  |San Francisco|Los Angeles|559.1205770615534 |
+---+-------------+-----------+------------------+

Summary

User Defined Functions (UDFs) in PySpark provide a powerful mechanism to extend the functionality of PySpark's built-in operations by allowing users to define custom functions that can be applied to PySpark DataFrames and SQL queries.

Although UDFs can be extremely useful, there are some downsides to their use, the major one of which, can be performance. UDFs might introduce processing bottlenecks into code execution because the Spark optimiser cannot plan tasks around the custom logic in UDFs efficiently. In addition, logic that executes outside the JVM has extra costs around data serialization. So that's something to watch out for and consider if using them.

However, for ad hoc queries, manual data cleansing or exploratory data analysis on small or even medium-sized datasets, the latency overhead costs associated with UDFs are unlikely to outweigh the benefits of their use.

_OK, that's all for me just now. I hope you found this article useful. If you did, please check out my profile page at this link. From there, you can see my other published stories and subscribe to get notified when I post new content._

If you've liked this content, I think you'll also find these articles an interesting read.

PySpark Explained: Dealing with Invalid Records When Reading CSV and JSON Files

New Pandas rival, FireDucks, brings the smoke!

Tags: Data Engineering Data Science Pyspark Spark Tips And Tricks

Comment