Dask DataFrame is Fast Now

Author:Murphy  |  View: 23370  |  Time: 2025-03-22 21:34:54

Introduction

Dask DataFrame scales out pandas DataFrames to operate at the 100GB-100TB scale.

Historically, Dask was pretty slow compared to other tools in this space (like Spark). Due to a number of improvements focused on performance, it's now pretty fast (about 20x faster than before). The new implementation moved Dask from getting destroyed by Spark on every benchmark to regularly outperforming Spark on TPC-H queries by a significant margin.

Dask DataFrame workloads struggled with many things. Performance and memory usage were commonly seen pain points, shuffling was unstable for bigger datasets, making scaling out hard. Writing efficient code required understanding too much of the internals of Dask.

The new implementation changed all of this. Things that didn't work were completely rewritten from scratch and existing implementations were improved upon. This puts Dask DataFrames on a solid foundation that allows faster iteration cycles in the future.

We'll go through the three most prominent changes, covering how they impact performance and make it easier to use Dask efficiently, even for users that are new to distributed computing. We'll also discuss plans for future improvements.

I am part of the core team of Dask. I am an open source engineer for Coiled and was involved in implementing some of the improvements discussed in this post.

1. Apache Arrow Support: Efficient String Datatype

A Dask DataFrame consists of many pandas DataFrames. Historically, pandas used NumPy for numeric data, but Python objects for text data, which are inefficient and blow up memory usage. Operations on object data also hold the GIL, which doesn't matter much for pandas, but is a catastrophy for performance with a parallel system like Dask.

The pandas 2.0 release introduced support for general-purpose Arrow datatypes, so Dask now uses PyArrow-backed strings by default. These are much better. PyArrow strings reduce memory usage by up to 80% and unlock multi-threading for string operations. Workloads that previously struggled with available memory now fit comfortably in much less space, and are a lot faster because they no longer constantly spill excess data to disk.

I wrote a post about this that investigates Arrow integrations in more detail if you want to learn more.

2. Faster Joins with a New Shuffle Algorithm

Shuffling is an essential component of distributed systems to enable sorting, joins, and complex group by operations. It is an all-to-all, network-intensive operation that's often the most expensive component in a workflow. We rewrote Dask's shuffling system, which greatly impacts overall performance, especially on complex, data-intensive workloads.

A shuffle operation is intrinsically an all-to-all communication operation where every input partition has to provide a tiny slice of data to every output partition. Dask was already using it's own task-based algorithm that managed to reduce the O(n * n) task complexity to O(log(n) * n) where n is the number of partitions. This was a drastic reduction in the number of tasks, but the non-linear scaling ultimately did not allow Dask to process arbitrarily large datasets.

Dask introduced a new P2P (peer-to-peer) shuffle method that reduced the task complexity to O(n) which scales linearly with the size of the dataset and the size of the cluster. It also incorporates an efficient disk integration which allows easily shuffling datasets which are much larger than memory. The new system is extremely stable and "just works" across any scale of data.

One of my colleagues wrote a post about this that includes a more extensive explanation and a lot of technical details.

3. Optimizer

Dask itself is lazy, which means that it registers your whole query before doing any actual work. This is a powerful concept that enables a lot of optimizations, but historically Dask wasn't taking advantage of this knowledge in the past. Dask also did a bad job of hiding internal complexities and left users on their own while navigating the difficulties of distributed computing and running large scale queries. It made writing efficient code painful for non-experts.

The Dask release in March includes a complete re-implementation of the DataFrame API to support query optimization. This is a big deal. The new engine centers around a query optimizer that rewrites our code to make it more efficient and better tailored to Dask's strengths. Let's dive into some optimization strategies, how they make our code run faster and scale better.

We will start with a couple of general purpose optimizations that are useful for every DataFrame-like tool before we dive into more specific techniques that are tailored to distributed systems generally and Dask more specifically.

3.1 Column Projection

Most datasets have more columns than what we actually need. Dropping them requires foresight ("What columns will I need for this query?

Tags: Big Data Dask Data Science Programming Python

Comment