PySpark Explained: The InferSchema Problem

Author:Murphy  |  View: 22189  |  Time: 2025-03-23 11:23:57

Whether you're a data scientist, data engineer, or programmer, reading and processing CSV data will be one of your bread-and-butter skills for years.

Most programming languages can, either natively or via a library, read and write CSV data files, and PySpark is no exception.

It provides a very useful spark.read function. You'll probably have used this function along with its inferschema directive many times. So often in fact that it almost becomes habitual.

If that's you, in this article, I hope to convince you that this is usually a bad idea from a performance perspective when reading large CSV files, and I'll show you what you can do instead.

Firstly, we should examine where and when inferschema is used and why it's so popular.

The where and when is easy. Inferschema is used explicitly as an option in the spark.read function when reading CSV files into Spark Dataframes.

You might ask, "What about other types of files"?

The schema for Parquet and ORC data files is already stored within the files. So explicit schema inference is not required.

For JSON files, schema inference is applied implicitly by PySpark, so there's no need to use it for those either.

In non-CSV text files, PySpark normally reads each line of the file into a one-column Dataframe so, again, schema inference is not required.

Explicit schema inference is popular when dealing with CSV data because it solves two common problems.

  • First, you may not always know the number and data types of the columns in the CSV data you want to read.
  • Second, even if the CSV schema is well-defined and known, manually crafting a schema for files with many fields to pass into your CSV read function can be cumbersome.

Using inferschema makes a pretty good fist of solving the above problems on your behalf, saving you a load of time and effort. But it comes with a big downside that we'll talk about later.

In the rest of this article, I'll show you when you can use inferschema when you shouldn't and what to do instead.

Setting up a development environment

If you want to follow along with the code in this article, you'll need access to a PySpark development environment.

If you're lucky enough to have access to PySpark either through your work, via the cloud, or a local install, go ahead and use that. Or, if you haven't already, click on the link below to learn how to use a great online development tool called Databricks Community Edition for free.

Databricks is a cloud-based platform for data engineering, Machine Learning, and analytics built around Apache Spark and provides a unified environment for working with big data workloads. The founders of Databricks created Spark, so they know their stuff.

How to access a FREE online Spark development environment

Sourcing a large CSV data file

We don't need a complicated data set for our purposes. A set of synthetic sales data with the following schema will suffice.

  • order_date (date)
  • customer_id (int)
  • customer_name (str)
  • product_id (int)
  • product_name (str)
  • category (str)
  • quantity (int)
  • price (float)
  • total (float)

To source a good-sized data set, I created a Pyspark program to create synthetic data and ran it on DataBricks to generate an approximately 10 million record CSV test file. Here is that program.


from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, rand, lit, col, concat
from pyspark.sql.types import StringType, IntegerType, FloatType
import random
from datetime import datetime, timedelta

# Initialize Spark session
spark = SparkSession.builder.appName("SalesDataGenerator").getOrCreate()

# Function to generate a random date
def generate_random_date():
    start_date = datetime(2023, 1, 1)
    end_date = datetime(2023, 12, 31)
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days
    random_number_of_days = random.randrange(days_between_dates)
    random_date = start_date + timedelta(days=random_number_of_days)
    return random_date.strftime("%Y-%m-%d")

# Register UDF for random date generation
random_date_udf = udf(generate_random_date, StringType())

# Generate base dataset
num_records = 10000000
df = spark.range(0, num_records)

# Add random data to the dataset
df = df.withColumn("order_date", random_date_udf()) 
    .withColumn("customer_id", (rand() * 900 + 100).cast(IntegerType())) 
    .withColumn("customer_name", concat(lit("Customer_"), (rand() * 1000000).cast(IntegerType()).cast(StringType()))) 
    .withColumn("product_index", (rand() * 10).cast(IntegerType())) 
    .withColumn("quantity", (rand() * 10 + 1).cast(IntegerType())) 
    .withColumn("price", (rand() * 998 + 1.99).cast(FloatType()))

# Check the generated DataFrame
df.show(10)

# Define product names and categories
product_data = [
    ("Laptop", "Electronics"),
    ("Smartphone", "Electronics"),
    ("Desk", "Office"),
    ("Chair", "Office"),
    ("Monitor", "Electronics"),
    ("Printer", "Electronics"),
    ("Paper", "Office"),
    ("Pen", "Office"),
    ("Notebook", "Office"),
    ("Coffee Maker", "Electronics")
]

# Create a DataFrame with product data
product_df = spark.createDataFrame(product_data, ["product_name", "category"])

# Add product_index to product_df for join
product_df = product_df.withColumn("product_index", (rand() * 10).cast(IntegerType()))

# Check the product DataFrame
product_df.show(10)

# Join with product data
df = df.join(product_df, "product_index")

# Check the joined DataFrame
df.show(10)

# Calculate total and finalize columns
df = df.withColumn("product_id", col("product_index") + 200) 
    .withColumn("total", col("quantity") * col("price")) 
    .select("order_date", "customer_id", "customer_name", "product_id", "product_name", "category", "quantity", "price", "total")

# Check DataFrame count
record_count = df.count()
print(f"Record count: {record_count}")

# Write to CSV file
df.write.csv("dbfs:/FileStore/sales_data/sales_data.csv", header=True, mode="overwrite")

print("CSV file with fake sales data has been created.")

Here are what the first few records look like in tabular form.

+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|order_date|customer_id|  customer_name|product_id|product_name|   category|quantity|    price|    total|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|2023-10-28|        587| Customer_30485|       203|      Laptop|Electronics|      10| 281.3928| 2813.928|
|2023-10-28|        587| Customer_30485|       203|     Monitor|Electronics|      10| 281.3928| 2813.928|
|2023-03-17|        653|Customer_224105|       203|      Laptop|Electronics|       6|763.33246|4579.9946|
|2023-03-17|        653|Customer_224105|       203|     Monitor|Electronics|       6|763.33246|4579.9946|
|2023-01-03|        206|Customer_194620|       203|      Laptop|Electronics|       2| 705.8043|1411.6086|
|2023-01-03|        206|Customer_194620|       203|     Monitor|Electronics|       2| 705.8043|1411.6086|
|2023-10-20|        629|Customer_892763|       203|      Laptop|Electronics|       4|  98.0225|   392.09|
|2023-10-20|        629|Customer_892763|       203|     Monitor|Electronics|       4|  98.0225|   392.09|
|2023-05-13|        720|Customer_275572|       203|      Laptop|Electronics|       3| 957.0872|2871.2617|
|2023-05-13|        720|Customer_275572|       203|     Monitor|Electronics|       3| 957.0872|2871.2617|
|2023-08-18|        591|Customer_380869|       203|      Laptop|Electronics|       6|333.59012|2001.5408|
|2023-08-18|        591|Customer_380869|       203|     Monitor|Electronics|       6|333.59012|2001.5408|
|2023-02-09|        336|Customer_911005|       203|      Laptop|Electronics|       9| 598.7148| 5388.433|
|2023-02-09|        336|Customer_911005|       203|     Monitor|Electronics|       9| 598.7148| 5388.433|
|2023-09-30|        108|Customer_214382|       203|      Laptop|Electronics|       5|  693.231| 3466.155|
|2023-09-30|        108|Customer_214382|       203|     Monitor|Electronics|       5|  693.231| 3466.155|
|2023-02-09|        561|Customer_773507|       203|      Laptop|Electronics|       2|126.02791|252.05582|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+

A typical inferschema use case

Now that we have our data, let's look at how you might typically read this data into a Dataframe using inferschema.

Python">df = spark.read.format("csv") 
    .option("header", "true").option("inferSchema", "true") 
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/sales_data.csv")

df.printSchema()

#
# Output
#

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- total: double (nullable = true)

So, we're just reading the data into a data frame and then having a quick look at the schema. For small to medium-sized CSV files, this code and the use of inferschema are absolutely fine.

When Spark sees this directive, it examines our input file and derives the most suitable data types for the fields. This can be a useful and time-saving benefit.

We can see the schema of our DataFrame from the output of the printSchema() call.

However, for very large files, there is a big snag. That's because when the inferschema option is used, Spark has to read the whole file and check each record before it can derive its final schema.

For files that contain millions or even billions of records, that is a serious performance bottleneck.

To give you an idea of the kind of performance hit you can expect to receive, I timed reading my input file with and without using inferschema.

Run 1 – with inferschema

%%time
df = spark.read.format("csv") 
    .option("header", "true").option("inferSchema", "true") 
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/sales_data.csv")

df.show(5)

#
# Output
#

+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|order_date|customer_id|  customer_name|product_id|product_name|   category|quantity|    price|    total|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|2023-10-28|        587| Customer_30485|       203|      Laptop|Electronics|      10| 281.3928| 2813.928|
|2023-10-28|        587| Customer_30485|       203|     Monitor|Electronics|      10| 281.3928| 2813.928|
|2023-03-17|        653|Customer_224105|       203|      Laptop|Electronics|       6|763.33246|4579.9946|
|2023-03-17|        653|Customer_224105|       203|     Monitor|Electronics|       6|763.33246|4579.9946|
|2023-01-03|        206|Customer_194620|       203|      Laptop|Electronics|       2| 705.8043|1411.6086|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+

CPU times: user 158 ms, sys: 23.6 ms, total: 181 ms
Wall time: 48.1 s

Run 2— without inferschema

%%time
df = spark.read.format("csv") 
    .option("header", "true") 
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/sales_data.csv")
df.show(5)

#
# Output
#

+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|order_date|customer_id|  customer_name|product_id|product_name|   category|quantity|    price|    total|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+
|2023-10-28|        587| Customer_30485|       203|      Laptop|Electronics|      10| 281.3928| 2813.928|
|2023-10-28|        587| Customer_30485|       203|     Monitor|Electronics|      10| 281.3928| 2813.928|
|2023-03-17|        653|Customer_224105|       203|      Laptop|Electronics|       6|763.33246|4579.9946|
|2023-03-17|        653|Customer_224105|       203|     Monitor|Electronics|       6|763.33246|4579.9946|
|2023-01-03|        206|Customer_194620|       203|      Laptop|Electronics|       2| 705.8043|1411.6086|
+----------+-----------+---------------+----------+------------+-----------+--------+---------+---------+

CPU times: user 13.8 ms, sys: 8.55 ms, total: 22.4 ms
Wall time: 17.3 s

As you can see, the runtime is significantly greater when using inferschema than without.

Using inferschema imposed a 31-second overhead on this code. This might seem insignificant, but remember that the size of our input file in big data terms was still relatively modest. If you had hundreds, thousands or more large CSVs to process, it soon adds up.

Importantly, too, if we look at the schema imposed on the data without using inferschema,

df.printSchema()

root
 |-- order_date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)
 |-- total: string (nullable = true)

PySpark has just assigned the String type to each input field. This is not ideal if we need to do further processing or analysis on this data.

So we have a quandary,

  • Use inferschema. Slower ingestion but accurate data types.
  • Don't use inferschema. Quicker ingestion but inaccurate data types.

We know that inferschema is a useful option when reading CSV files, but how do we get around the performance problem when dealing with bigger CSV files?

I have a couple of suggestions that mean you can both ingest large CSV data quickly and have an accurate schema.

Workarounds

1/ Using a pre-defined schema

The first method involves removing the use of inferschema altogether. Instead, using the built-in PySpark SQL data types – StructType and StructFields – we construct a schema that corresponds to our CSV field types.

We can then feed our pre-made schema into the .schema option of the spark.read command. This method can only be used if you already know the schema of your CSV data.

For our example,

#
# construct our pre-defined schema type
#
from pyspark.sql.types import StructField, StructType, DoubleType, LongType,
IntegerType, DateType, StringType

myschema = StructType([

        StructField("order_date", DateType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("customer_name", StringType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("total", DoubleType(), True)
])

In our read statement, instead of using inferschema, we use the .schema directive like this:-


df = spark.read.format("csv") 
    .option("header", "true").schema(myschema) 
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/sales_data.csv")

df.printSchema()

#
# Output
#

root
 |-- order_date: date (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- total: double (nullable = true)

2/ This second method is useful if you don't know in advance the schema of your input CSV file(s) or if they contain numerous fields that would make using the first method overly time-consuming.

The main idea is to use inferschema on a small CSV file that's a subset (just a handful of records) of the larger CSV. Once we have that schema, we can apply it when reading the main CSV file.

The creation of the small subset CSV file can be done manually or programmatically if this is to be part of an automated pipeline.

In the example below, I've assumed that the smaller, sample CSV file has already been created.


#
# Assume we have manually or programmatically created
# a small CSV file that is a representative sample of our 
# large CSV file.
#
# Now read the small file with inferschema
#
smallDf = spark.read.format("csv") 
    .option("header", "True") .option("inferschema","True"
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/small_sales_data.csv")

# use the discovered schema from the small file to read the large file
# with the .schema directive
#
df = spark.read.format("csv") 
    .option("header", "false").schema(smallDf.schema)
    .option("delimiter", ',').load("dbfs:/FileStore/sales_data/sales_data.csv")

Using inferschema on the small, sample CSV has almost no performance overhead, but as a side effect, we get a derived schema that will be the same as the one required for the big CSV file. We just supply that schema in the .schema option of the spark.read function when reading the big file.

Summary

We've examined the use of one of the most common file reading options used in PySpark when ingesting CSV data files – the inferschema directive.

I showed that even though it's a useful option, it can impose a serious performance hit when reading very large CSV data files. This is because when inferschema is used, PySpark has to read the whole file before deciding on the final schema of the file.

That being said, I also highlighted a couple of workarounds that showed you how to avoid using inferschema altogether or to use it in conjunction with other methods to still be able to read large CSV files quickly.

_That's all for me for now. Hopefully, you found this article useful. If you did, please check out my profile page at this link. From there you can see my other published stories and subscribe to get notified when I post new content._

Click on these links for more of my articles related to subjects covered in this story that I think you'll enjoy.

PySpark Explained: Delta Table Time Travel Queries

1 Billion Row Challenge – Part 2

Tags: Data Science Machine Learning Programming Pyspark Python

Comment