Ranking Diamonds with PCA in PySpark

Author:Murphy  |  View: 21777  |  Time: 2025-03-22 23:39:12

Introduction

Here we go for another post about PySpark. I have been enjoying writing about this subject, as it feels to me that we are lacking of good blog posts about PySpark, especially when we talk about Machine Learning in MLlib – by the way, that is Spark's Machine Learning Library, designed to work with Big Data in a parallelized environment.

I can tell that the Spark Documentation is excellent. It is super organized and easy to follow the examples. But working with machine learning in Spark is not the most friendly thing to do.

In this post, I work on a PCA model to help me creating a ranking of diamonds, and I had to face a couple of challenges that we will go over in the next lines.

I have already written about PCA before and how it's useful for dimensionality reduction, as well as for creating rankings. However, this is the first time I do this using Spark, aiming to reproduce the technique on a Big Data environment.

Let's see the result.

Coding

Let's start our coding with the modules to import.

from pyspark.sql.functions import col, sum, when, mean, countDistinct
from pyspark.sql import functions as F
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window

Dataset

The dataset to be used in this exercise is the Diamonds, from the ggplot2 package and licensed under Creative Commons 4.0.

Here, I am loading it from the Databricks sample datasets and removing two known outliers from one of the variables. PCA is affected by outliers. They tend to dominate a component due to very large and distorted variance.

# Point file path
path = '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv'

# Load Data
df = spark.read.csv(path, header=True, inferSchema= True)
df = df.filter( col('y') < 30)

Next, since PCA is a technique to be used for numerical values, I have chosen to work with the carat , table and depth variables from the data. I am not working with price as it dominates completely the variance of the components, making the model ineffective.

Furthermore, as the scales are different, I am also transforming the data to a logarithmic value, so they're all in the same scale.

# Get only our selected columns
cols = [ 'carat', 'table', 'depth']

df_num = df.select(F.log1p('carat').alias('carat'), 
                   F.log1p('table').alias('table'), 
                   F.log1p('depth').alias('depth'))

Here's how it looks.

Log values of the selected variables. Image by the author.

The next step is to vectorize our dataset.

Vectorization

One of the challenges we face when working with Big Data in Spark is that the machine learning algorithms require that the data is inputted as a vector. This could be more straightforward, but unfortunately it adds a couple of steps to our code.

If we had other steps to perform prior to the vectorization, we could create a Pipeline that can take the steps previously coded and run them at once. In our case, this step is not necessary, as we only have one step.

So, next we are creating a VectorAssembler. As the name intuitively tells us, it will get the numbers and gather them as a single vector by observation, putting all of them in a column named features.

assembler = VectorAssembler(inputCols= cols, outputCol="features")
prepared_df = assembler.transform(df_num)

The result will look like this, as follows.

Vectorized data. Image by the author.

PCA

To run the algorithm is fairly simple using MLlib. We instantiate PCA and fit the data.

# Run PCA it with all the possible components
pca = PCA(k= len(df_num.columns), inputCol="features")
pca.setOutputCol("pca_features")

# Fit
model = pca.fit(prepared_df)

If we want to see how is the explained variance per Principal Component after we ran PCA, we will need Pandas to make our life easier and be able to quickly create a data frame out of the model.explainedVariance and then we insert a new column with the PC names.

# See explained variance of the PCs
df_var = ps.DataFrame(model.explainedVariance, columns=['explained_var'])
df_var.insert(0, 'Component', 
              value= ['PC'+str(n) for n in range(1,len(df_num.columns)+1)])

In Databricks, it is as simple as clicking a couple of buttons to create this plot.

PC1 explains 96% of the variance. Image by the author.

Getting the Transformed Data

Given that Spark is designed for Big Data, it lacks a more complete output for the Principal Components. Numbers like eigenvectors, eigenvalues and loadings are not easily found as attributes of the PCA model.

All we get as output is the explained variance by PC and the table with the transformed data. Those numbers can give us the information of: (1) direction, being positive for those data points varying in the same direction of the PC and negative when their variation is to the opposite direction; (2) value, where the higher it means that it is varies more in that component.

This is the code snippet to get the transformed data.

# Get transformed Output
model.setOutputCol("output")
pca_transformed = model.transform(prepared_df).select('output')

And the resulting vectors.

PCA results. Image by the author.

Look how the observation #5 is the one with the most variance in the opposite direction of PC1, as well as the most variance in PC3.

These numbers can give us the base to rank these observations. Since we know how much they are varying in each component and how much variance is explained by component, we can quickly create a score by multiplying the values in each PC by the respective explained variance.

Let's see how.

Cleaning the Transformed Data

Here, there is another challenge. Those vectors are not very manipulation friendly. So we must workaround the problem.

First, let's just collect the transformed values.

# Collect the transformed results
temp = spark.createDataFrame(pca_transformed.collect())

Good. But they don't come ready for use. Each row is still a DenseVector. A solution I found to deal with that was casting the temporary dataset to string, so I can split that into 3 separate columns.

# Cast data to text
temp = temp.select(col('output').cast('string'))

[OUT]
output
[-0.31666817904639416,-2.8476048469770308,5.090764334358518]
[-0.32446613567339955,-2.833791365368773,5.079981934273924]
[-0.3312692998472932,-2.777878254082261,5.094343264160385]
[-0.33271613063654204,-2.830862619169142,5.089337919087877]

Since the values are now strings, the [] must be removed from the first and last characters. Using the split function, we can break the lines by the , as our separator. To remove the square brackets, we just slice the first character in the PC1 and the last from PC3. All the columns are cast('float') to make it numerical again.

# split columns
df_transformed = (
    temp
    .select(
        F.split('output', ',')[0][2:15].alias('PC1').cast('float'),
        F.split('output', ',')[1].cast('float').alias('PC2'),
        F.split('output', ',')[2][0:14].alias('PC3').cast('float')       ) 
        .fillna(0)
        )

Voilà. We have our data ready for the ranking.

Clean transformed data. Image by the author.

Calculating Scores

The scores can be calculated by multiplying the transformed data from each PC by the respective explained variance and adding all together.

PC1 * Explained Var PC1 +

PC2 * Explained Var PC2 +

PC3 * Explained Var PC3

Then, we will create a column with an index, using row_number and Window, so we can join the results to the original data. And finally a column with a dense_rank, which is our ranking of diamonds.

# Explained variance array
expl_var = model.explainedVariance

df_transformed = (
        df_transformed
        .withColumn('_c0', F.row_number().over(Window.partitionBy().orderBy(F.lit(1))))
        .withColumn('score', (col('PC1') * expl_var[0]) + (col('PC2') * expl_var[1]) + (col('PC3') * expl_var[2]) )
        .withColumn('rank', F.dense_rank().over(Window.partitionBy().orderBy('score')) )
        .sort('_c0')
        )

Let's display.

Top 10 in the ranking. Image by the author.

The previous table is the rank of diamonds based on the combination of carat , table and depth variables. It looks pretty good. We can see, at least in these top 10 results, that even though the categorical columns were not considered, we still see better colors and clarity better ranked. The colors go from D (best) to J (worst) and the clarity follows this sequence: (I1 (worst), SI2, SI1, VS2, VS1, VVS2, VVS1, IF (best)).

For example: rank 1 has a better color than 2, but smaller carat and worse clarity. Rank 5 has a better color and better clarity than 6 and 7, with a better price, but smaller carat. Certainly the depth, table are getting a role in those.

Before You Go

We have reached the end of this post. I found a couple of challenges performing PCA in Spark. It is not one of the most friendly methods I worked with.

I believe that there is a lot missing, way far from statistical tools like R, where you can get a much superior implementation of the algorithm. On the other hand, Spark is not meant to be a statistical tool. It is a Big Data tool initially created for ETL and that has been growing and adding more functionalities.

Furthermore, PCA can be used for dimensionality reduction when we change the k value of components to extract. For that use with Big Data, it can be a helper to reduce the dimensions of the dataset and serve as input for other steps in an analysis.

# Reduce to k components
pca = PCA(k = k, inputCol="features")

If you liked this content, follow my blog for more.

Gustavo Santos – Medium

Also, find me on LinkedIn.

Complete code in GitHub:

Studying/PySpark/PCA at master · gurezende/Studying

Interested in learning more about PySpark?

Enroll here: Master Data Wrangling With PySpark. Image by the author.

Here is a link to my course with a discount coupon applied: Mater Data Wrangling With PySpark in Udemy.

Reference

Creating Scores and Rankings with PCA

PCA: Beyond Dimensionality Reduction

PCA – PySpark 3.5.0 documentation

Tags: Data Science Principal Component Pyspark Statistics Unsupervised Learning

Comment