Pandas for Data Engineers

Author:Murphy  |  View: 26263  |  Time: 2025-03-22 22:55:49

In this story, I would like to talk about things I like about Pandas and use often in Etl applications I write to process data. We will touch on exploratory data analysis, data cleansing and data frame transformations. I will demonstrate some of my favourite techniques to optimize memory usage and process large amounts of data efficiently using this library. Working with relatively small datasets in Pandas is rarely a problem. It handles data in data frames with ease and provides a very convenient set of commands to process it. When it comes to data transformations on much bigger data frames (1Gb and more) I would normally use Spark and distributed compute clusters. It can handle terabytes and petabytes of data but probably will also cost a lot of money to run all that hardware. That's why Pandas might be a better choice when we have to deal with medium-sized datasets in environments with limited memory resources.


Pandas and Python generators

In one of my previous stories I wrote about how to process data efficiently using generators in Python [1].

Python for Data Engineers

It's a simple trick to optimize the memory usage. Imagine that we have a huge dataset somewhere in external storage. It can be a database or just a simple large CSV file. Imagine that we need to process this 2–3 TB file and apply some transformation to each row of data in this file. Let's assume that we have a service that will perform this task and it has only 32 Gb of memory. This will limit us in data loading and we won't be able to load the whole file into the memory to split it line by line applying simple Python split('n') operator. The solution would be to process it row by row and yield it each time freeing the memory for the next one. This can help us to create a constantly streaming flow of ETL data into the final destination of our data pipeline. It can be anything – a cloud storage bucket, another database, a data warehouse solution (DWH), a streaming topic or another application. I wrote about data pipeline design here:

Data pipeline design patterns

So how do we do this in Python?

Consider this code snippet below. It will process data in chunks using yield.

 def etl(cursor):

     cursor.execute(query)

     for row in cursor.fetchall():
         # some ETL
         yield row 

Now we would want to create a data frame where data is being processed as a stream of rows (records). In this case, we won't need to load the whole dataset into the memory as one chunk. This Pandas code will do this:

def df_generator(cursor):
    print('Creating pandas DF using generator...')

    column_names = ['id', 
                'merchant', 
                'status', 
                'transaction_date', 
                'amount_usd']

    df = pd.DataFrame(data = etl(cursor), columns=column_names)

    print('DF successfully created!n')

    return df 

In a similar way, we can process data in chunks to generate a data frame (below). We still use cursor.execute(query) on our database that would extract all records we need but we wouldn't want to load them into the memory in one go. We will process them in chunks instead and then we will use pd.concat to add our new chunk into the final data frame.

 def df_create_from_batch(cursor, batch_size):
     print('Creating pandas DF using generator...')
     colnames = ['id', 
                'merchant', 
                'status', 
                'transaction_date', 
                'amount_usd']

     df = pd.DataFrame(columns=colnames)
     # execute a database query to extract data
     cursor.execute(query)
     while True:
         rows = cursor.fetchmany(batch_size)
         if not rows:
             break
         # some ETL on a chunk of data of batch_size
         batch_df = pd.DataFrame(data = rows, columns=colnames)        
         df = pd.concat([df, batch_df], ignore_index=True)
     print('DF successfully created!n')
     return df

Python generators are powerful!

We can even yield each chunk from the data frame we create and do something with it, i.e. upload transformed data to cloud storage. In this case, we won't need to load the whole dataset into the memory. Consider this example below. It will extract data from our database in chunks to load into Pandas. then we will apply some ETL there and upload each chunk to AWS S3. Now our df_create_from_batch is a generator yielding each chunk and we can simply pass this generator to our new function upload_chunks(chunk_gen,...) to upload data into cloud storage:

import boto3
import json
from datetime import datetime
import pytz
s3 = boto3.client('s3')

def upload_chunks(chunk_gen, s3_bucket, s3_file_prefix):
    '''
    Perform Multi-part upload to AWS S3 datalake:
    '''
    try:
        cnt = 0
        logs = []
        for chunk in chunk_gen:
            part = bytes(json.dumps(chunk), encoding='utf8')
            key = s3_file_prefix + file_key()
            s3.put_object(Body=part, Bucket=s3_bucket, Key=key)
            logs.append(f'aws s3 cp s3://{s3_bucket}/{key} ./ ')
            cnt += 1

        print(f'upload_chunks: Uploaded {cnt} chunks.')
        print('n'.join(str(i) for i in logs))
    except Exception as e:
        print(e)

def file_key():
    '''
    Get a file suffix, i.e. /data_pipe_1/2023/12/11/09/5234023930
    '''
    suffix = datetime.utcnow().replace(tzinfo=pytz.utc).strftime('%Y/%m/%d/%H/%M%S%f')
    return f'{suffix}'

def df_create_from_batch(cursor, batch_size):
     print('Creating pandas DF using generator...')
     colnames = ['id', 
                'merchant', 
                'status', 
                'transaction_date', 
                'amount_usd']

     df = pd.DataFrame(columns=colnames)
     # execute a database query to extract data
     cursor.execute(query)
     while True:
         rows = cursor.fetchmany(batch_size)
         if not rows:
             break
         # some ETL on a chunk of data of batch_size
         batch_df = pd.DataFrame(data = rows, columns=colnames)        
         yield batch_df
     print('DF successfully created!n')
     return df

s3_upload_scope = df_create_from_batch(cursor, 10000)
upload_chunks(s3_upload_scope, config.get('s3_bucket'), pipe['name'])

cursor.fetchmany(batch_size) and cursor.fetchall() are typical connection patterns for all modern databases and data warehouse solutions including Snowflake, Postgres and MySQL. Consider this MySQL example below. It will use it too:

import pymysql
conn = pymysql.connect(host=rds_host, 
                        user=user_name, 
                        passwd=password, 
                        db=db, 
                        connect_timeout=5,
                        charset='utf8mb4',
                        cursorclass=pymysql.cursors.DictCursor
                        )
logging.info("SUCCESS: Connection to RDS MySQL instance succeeded")

def get_sb_data(sql, param):
    '''
    Creates an extract generator and yield row by row from MySQL db
    '''
    try:

        with conn.cursor() as cur:
            query = sql.format(param)
            cur.execute(query)
            logging.debug('---')
            logging.debug(f'query: {query}')
            for row in cur.fetchall():
            # some ETL
                logging.debug(f'row: {row}')
                yield row 

    except pymysql.MySQLError as e:
        logging.error("ERROR: Unexpected MySQL error: Could not connect to MySQL instance.")
        logging.error(e)
        sys.exit()

Modern DWH solutions are pretty much the same. For instance, in Snowflake we can do that as well and then load it into the data frame.

import snowflake.connector

...
with conn.cursor() as cur:
  cur.execute(query)
  for row in cur.fetchall():
    # some ETL
      logging.debug(f'row: {row}')
      yield row 

I previously wrote about it in one of my stories here [3]:

Advanced ETL Techniques for Beginners

Python generators are very useful for scenarios like this one but we can do even more.

We can chain generators!

Let's imagine that cursor.fetchmany() doesn't exist or won't work for some reason but we still need to process data in chunks of a given size. We can create our bespoke chunk generator and then pass it to our second generator to apply some ETL logic in chunks. Consider this Python snippet below it chains two generators. This particular technique will be useful when we need to extract data in chunks using a predefined set of ids in SQL query WHERE clause, i.e. running get_sb_data(sql, ids_str) using stringified ids parameter.

def chunk_gen(itemList, chunks):
    '''Read data in chunks and yield each chunk'''
    for i in range(0, len(itemList), chunks):
        yield itemList[i:i + chunks]

def sb_batch_extract(idList_gen):
    '''Reads data generator, i.e. list of ids, and works with each batch
    to extract data from database
    '''
    try:
        step = 1
        while True:
            ids = next(idList_gen)
            logging.debug(f'> Step {step} processing ids:  {ids})')
            ids_str = ','.join(str(i) for i in ids)
            out = get_sb_data(sql, ids_str)
            logging.debug(f'> Step {step} ids used to produce : {out}')
            step += 1
            yield out
    except Exception as e:
        print(e)
    except StopIteration:
        pass
    finally:
        del idList_gen

#  Step 1: Generate user id list as generator object.
idList_gen = chunk_gen([col[0] for col in get_ids()], 250)
# Step 2: Extract in chunks from database:
extract = sb_batch_extract(idList_gen)

actual = [i for i in batch_extract_demo(idList_gen)]

Modern data warehouses and Pandas

Pandas became incredibly popular due to its robust set of data transformation features, including data profiling, cleansing, data frame manipulations and a vast range of connectivity options. It would be reasonable to expect its integration in many data warehouse solution SDKs. Indeed, it is supported by many market leaders. For instance, we can use Pandas and Google Cloud BigQuery library to load data into DWH tables using data frames. We can create our a Cloud Function or an AWS Lambda microservice with the latest version of google-cloud-bigquery and Pandas library using Docker. Consider this example below. It will provide a function to load data into a BigQuery table using Pandas:

def _load_table_from_dataframe(table_schema, table_name, dataset_id):
    '''This function loads data using Pandas.
    Source data file format must be outer array JSON:
    [
    {"id":"1"},
    {"id":"2"}
    ]
    '''
    # Use CS.get_bucket to load data from Google Cloud Storage:
    # blob = CS.get_bucket(bucket_name).blob(file_name)
    # body = json.loads(blob.download_as_string())
    # For now we supply data in blob:
    blob = """
            [
    {"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]},
    {"id":"2","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}
    ]
    """
    body = json.loads(blob)
    print(pandas.__version__)

    table_id = client.dataset(dataset_id).table(table_name)
    job_config = bigquery.LoadJobConfig()
    schema = create_schema_from_yaml(table_schema) 
    job_config.schema = schema

    df = pandas.DataFrame(
    body,
    # In the loaded table, the column order reflects the order of the
    # columns in the DataFrame.
    columns=["id", "first_name","last_name","dob","addresses"],

    )
    df['addresses'] = df.addresses.astype(str)
    df = df[['id','first_name','last_name','dob','addresses']]

    print(df)

    load_job = client.load_table_from_dataframe(
        df,
        table_id,
        job_config=job_config,
    )

    load_job.result()
    print("Job finished.")

Snowflake DWH has similar features in their Python connector and we can load data from data frames like so:

import snowflake.connector
...
conn = snowflake.connector.connect(
      user='USER',
      # password='****',
      private_key=pkb,
      account='account_test',
      session_parameters={
          'QUERY_TAG': 'Testing',
      },
      warehouse="TEST_WH",
      database="DB",
      schema="SCHEMA"
    )

    with conn.cursor() as cur:

      out_df = pd.DataFrame(['{"1": "1", "name": "2", "fields": [{"1": "1", "2": "1"}]}'], columns=["SRC"])
      print('creating df...')
      print(out_df.head)
      table = '''TEST_SRC'''
      print(f'Writing df into {table}')
      success, nchunks, nrows, _ = write_pandas(conn, out_df, table)
      print(f'Succesfully laoded {success} :: N rows = {nrows} :: _ {_}')
      print('Getting new count')
      sql = '''select * from DB.SCHEMA.TEST_SRC'''
      cur.execute(sql)
      print('New count is :')
      for df in cur.fetch_pandas_batches():
        # my_dataframe_processing_function(df)
        print(df.shape[0])

Snowflake data warehouse offers another improved and extended library called Snowpark to process and load data using Pandas. When we load data into our data warehouse solution using Pandas we can apply the unit test to logic we use in the same way we would do it for any Python app. Consider this example below. It will use create_dataframe method:

import snowflake.snowpark as snowpark
from snowflake.snowpark.session import Session

...

    connection_parameters = {
      "user":'USER',
      # password='****',
      "private_key":pkb,
      "account":'account_test',
      "session_parameters": {
          'QUERY_TAG': 'Testing',
      },
      "warehouse":"TEST_WH",
      "database":"DB",
      "schema":"SCHEMA"
    }
    new_session = Session.builder.configs(connection_parameters).create()
    df1 = new_session.create_dataframe([1, 2, 3, 4]).to_df("a")
    df1.show()
    new_session.close() 

We can load the whole table into a data frame like so:

df1=new_session.table("DB.SCHEMA.TABLE")

Or we can either save it as a table or apply to another table using data frame:

df1_schema=new_session.table(DB.SCHEMA.TABLE).schema
# StructType([StructField('SRC', VariantType(), nullable=True)])
df1 = new_session.create_dataframe(['{"1": "1", "name": "2",
     "fields": [{"1": "1", "2": "1"}]}'], schema=df1_schema)
df1.write.mode("overwrite").save_as_table("DB.SCHEMA.TABLE")
df1.write.mode("append").save_as_table("DB.SCHEMA.TABLE")

After we run this we can see that it will be basically translated into this:

INSERT  INTO DB.SCHEMA.TABLE
SELECT "SRC" 
FROM ( 
  SELECT $1 AS "SRC" 
  FROM  VALUES 
('{"1": "1", "name": "2", "fields": [{"1": "1", "2": "1"}]}' :: STRING))

We can also create Snowflake views, flatten data, filter, cast and chain things using Pandas in Snowflake [4] which makes it an extremely powerful ETL technique. We can apply unit tests with ease when we create views and tables using Snowpark:

df.create_or_replace_view(f"{database}.{schema}.{view_name}")

Conclusion

Pandas library is a great tool for data manipulation, exploratory analysis and data loading. It became incredibly popular due to its robust set of data transformation features, including data profiling, cleansing, data frame manipulations and a vast range of connectivity options. Although there are plenty of articles explaining Pandas basics not many of them actually tell how to use it to load data efficiently. These articles explain how to load large datasets using Pandas and won't go into the details or further exploratory data analysis. Consider this example below. This is what I keep seeing on the internet again and again:

batchsize = 10 ** 5
with pd.read_csv(filename, chunksize=batchsize) as reader:
    for batch in reader:
        etl(batch)

With Pandas we can do so much more ETL-wise. Modern data warehouse solutions [5] have it built-in and fully enabled in their libraries and SDKs which makes it a powerful tool for data transformation and loading. I think that being able to process data in data frame chunks coupled with modern DWH capabilities makes data ingestion a lot more functional and easier to unit test. I hope you find this article useful. Please let me know what you think about Pandas and their role in modern ETL process design.

Modern Data Warehousing

Recommended read

[1] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[2] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[3] https://towardsdatascience.com/advanced-etl-techniques-for-beginners-03c404f0f0ac

[4] https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes

[5] https://towardsdatascience.com/modern-data-warehousing-2b1b0486ce4a

[6] https://pandas.pydata.org/

[7] https://www.dataquest.io/blog/pandas-big-data/

Tags: Big Data Data Engineering Data Science Etl Pandas

Comment