Building Durable Data Pipelines

Data durability in data pipeline design is a well-known pain point in the data engineering space. It is a well-known fact that data availability and data quality issues can lead to a significant increase in time on non-value-added tasks. In this story, I would like to speak about data engineering design patterns for data pipelines to ensure data is always there. We will speak about techniques that might help us to build a sustainable data transformation process where data is always delivered on time and our data pipeline can be described as robust, durable and maybe even self-fixing.
If a data pipeline fails employees most likely will have to perform a set of manual tasks including unnecessary data sourcing, aggregation and processing to get to the desired outcome.
Data durability is a renowned risk factor in data engineering. In my opinion, it is the least discussed topic online at the moment. However, simply because you don't see the problem it doesn't mean it is not there. Data engineers might not speak of it often. The issue though exists, seeding fear among data practitioners and turning data pipeline design into a real challenge.
Data availability and data quality issues might lead to further delays in data delivery and other reporting failures. According to McKinsey report, time spent by employees on non-value-adding tasks can increase drastically due to these factors:

This would typically include not-required data investigations including extra data sourcing, data cleansing, reconciliation, and aggreagtion resulting in lots of manual tasks.
These manual tasks are absolutely unnecessary
So how do we build robust, durable and self-fixing pipelines?
What is a data pipeline?
There is a data pipeline whenever there is data processing between points A and B. Once can be considered as the source and the other as a destination:

Consider this example below. In its logical structure, we have various data sources and stages to transform data to make it ready for analytics:

This is a common data engineering task I do daily – identifying the optimal way to deliver data from point A to point B.
The optimal doesn't mean the best. The "best" simply doesn't exist in data engineering and "the optimal" depends on many factors, business and functional requirements, i.e. data freshness, real-time reporting, self-fixing capabilities, security, data skills of our users, etc.
In one of my previous stories, I summarized some advanced data pipeline design patterns with examples [1]
A typical data pipeline design process might be the following:
- Describing data pipeline use case and scenario
- Understand data sources, data volumes and data freshness
- Choose a data ingestion technique
- Outline data processing
- Configure data storage
- Configure data tests
- Run unit tests for ETL tasks
- Set up data governance
Data ingestion and data loading
Data engineers create robust data loading systems that can process data efficiently, don't fail, don't consume too much memory, can handle various data formats and scale well.
Imagine that our source dataset is huge and our memory resources are limited.
How do we load it into our data warehouse efficiently?
Loading data in chunks or streaming it record by record helps to optimize memory usage. Consider this example below where we perform an ETl task with our data before loading it into the DWH. We use re.finditer
which acts like a generator and will read our data file in chunks so we'll be able to run the required ETL not consuming too much memory.
import io
import re
def etl(item):
print(f'Transforming item: {item}')
return item.replace("John", '****')
# Helper function to split our text file into chunks
# using separator
def splitStr(string, sep="s+"):
if sep=='':
return (c for c in string)
else:
return (_.group(1) for _ in re.finditer(f'(?:^|{sep})((?:(?!{sep}).)*)', string))
# Text file loaded as a blob
blob = """transaction_id,user_id,total_cost,dt
1,John,10.99,2023-04-15
2,Mary, 4.99,2023-04-12
"""
# data = blob.split("n") # We wouldn't want to do this on large datasets
# as it would require to load Big Data file as a whole in the first place
# consuming lots of memory
# We would want to use our generator helper function
# and process data in chunks
data = splitStr(blob, sep='n')
data_str = u"n".join(etl(item) for item in data)
print('New file contents:')
print(data_str)
data_file = io.BytesIO(data_str.encode())
print(data_file)
Output:
python example.py ✔ 48 19:52:06 dataform_env
Transforming item: transaction_id,user_id,total_cost,dt
Transforming item: 1,John,10.99,2023-04-15
Transforming item: 2,Mary, 4.99,2023-04-12
Transforming item:
New file contents:
transaction_id,user_id,total_cost,dt
1,****,10.99,2023-04-15
2,Mary, 4.99,2023-04-12
<_io.BytesIO object at 0x103672980>
The data ingestion step is crucial because if poorly managed it might become difficult to check for loading errors. Whether it's a data warehouse, database or data lake – it matters because we would want to check for loading errors and catch them early.
To say more, ideally, we would want our data ingestion pipeline not to rely on them and deal with any potential data drift issues such as schema change which happens way too often in the Data Engineering world. Some advanced ETL techniques can be found here [2]
Advanced ETL Techniques for Beginners
What is a data drift?
A typical data drift scenario refers to some unexpected and unintended data source changes that would require a lot of time to fix the data processing. This would typically include hours of debugging in our data pipelines. This happens when one field or even a simple record changes to the extent that breaks the Data Pipeline. So this is a really great example of a poor data pipeline design. it can even lead to further delays in data processing and reporting because unplanned changes might cause other bugs and failures.
Choosing the right data processing techniques helps to mitigate this risk factor. Consider ELT instead of ETL if you are loading data into the data warehouse. In this case, some fields that don't matter much won't cause the pipeline failure. They will be NULL instead and the data will still be transformed and delivered to the analytics database for further analysis.
This can be achieved by loading data "as is" in its original format to catch potential data issues later. The example below shows a simple table created in Snowflake where data is being loaded in VARIANT formats and can be transformed or checked for quality later:
create or replace table raw_test.schema_test.transactions_test (
RAW_DATA VARIANT
,TIMESTAMP TIMESTAMP
);
insert into raw_test.schema_test.transactions_test (
RAW_DATA
,TIMESTAMP
)
select
'{
"data": {
"amount": "2",
"created_at": "2022-07-21T05:08:27.057Z",
"currency": "GBP",
"id": "some_id_string_1",
"updated_at": "2022-07-21T05:09:02.214Z"
},
"metadata": {
"operation": "update"
}
}'
:: VARIANT
as raw_data
, TO_CHAR(timestampadd(days,-1, current_timestamp()), 'YYYY-MM-DD HH24:MI:SS.FF3') --'2024-03-01 20:01:35.017'
:: timestamp_ntz
as timestamp
When data is in our data warehouse we can process that JSON using built-in JSON functions like so:
select
JSON_EXTRACT_PATH_TEXT(raw_data,'data.id') as id
from
raw_test.schema_test.transactions_test
Sometimes we need to check the TIMESTAMP
format using TRY_TO_TIMESTAMP
instead of a TO_TIMESTAMP
. It's called safe-casting the format. If we choose to use TO_TIMESTAMP
if our timestamp format is wrong the view or the SQL operation will fail.
select TRY_TO_TIMESTAMP(upper(JSON_EXTRACT_PATH_TEXT(raw_data,'metadata.updated_at'))
, 'YYYY-MM-DD"T"HH24:MI:SS.FF6"Z"') as updated_at
from
raw_test.schema_test.transactions_test
Configuring data tests
Data quality is crucial for all production- and analytics-grade pipelines. In any dataset, we transform and create we would check our records if they meet a certain set of quality criteria, i.e. format, missing values, etc. This is called data quality checks using row conditions [3]
Consider this example with Google Cloud BigQuery below. It explains how we test if our data meets the quality requirements.
with checks as (
select
count( transaction_id ) as t_cnt
, count(distinct transaction_id) as t_cntd
, count(distinct (case when payment_date is null then transaction_id end)) as pmnt_date_null
from
production.user_transaction
)
, row_conditions as (
select if(t_cnt = 0,'Data for yesterday missing; ', NULL) as alert from checks
union all
select if(t_cnt != t_cntd,'Duplicate transactions found; ', NULL) from checks
union all
select if(pmnt_date_null != 0, cast(pmnt_date_null as string )||' NULL payment_date found', NULL) from checks
)
, alerts as (
select
array_to_string(
array_agg(alert IGNORE NULLS)
,'.; ') as stringify_alert_list
, array_length(array_agg(alert IGNORE NULLS)) as issues_found
from
row_conditions
)
select
alerts.issues_found,
if(alerts.issues_found is null, 'all good'
, ERROR(FORMAT('ATTENTION: production.user_transaction has potential data quality issues for yesterday: %t. Check dataChecks.check_user_transaction_failed_v for more info.'
, stringify_alert_list)))
from
alerts
;
This is the most simple and reliable data quality framework we can build in 5 minutes.
If you are a DBT user data quality checks can be run using dbt test
command if our data tests are set in in model.yml
file:
# dbt-runner/my_project/models/example/schema.yml
version: 2
models:
- name: table_a
description: "A starter dbt model"
columns:
- name: id
description: "The primary key for this table"
tests:
- unique
- not_null
- name: table_b
description: "A starter dbt model"
database: |
{%- if target.name == "dev" -%} raw_dev2
{%- elif target.name == "prod" -%} raw_prod
{%- else -%} invalid_database
{%- endif -%}
schema: events
columns:
- name: id
description: "The primary key for this table"
tests:
- unique
- not_null
unit_tests:
- name: test_table_b
description: "Expect a column to be created and table_b must return only one row id = 1."
model: table_b
given:
- input: ref('table_a')
rows:
- {id: 1}
- {id: 2}
- {id: 3}
- {id: null}
expect:
rows:
- {id: 1}
I wrote about it here [4]:
Data tests and data unit-Testing are different.
Data tests check our data for quality using row conditions when data unit testing is used to ensure that data transformation logic is persisted.
Using incremental strategies
Working with SQL we can use incremental updates for our datasets to mitigate data pipeline outages improve query performance and reduce the run time. If a data transformation task fails we can always re-run it and it will pick up only the records that haven't been processed yet.
It improves query performance simply because it doesn't scan the whole source dataset and all of its partitions. Consider this DBT example below. This template demonstrates the use of a Snowflake adapter to incrementally update a table. If we run the task with full refresh it will pull data from the original source table containing all historic data. Otherwise, it will use a view with the most recent data where it is partitioned by updated_at
:
-- depends_on: {{ ref('transactions__view') }}
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns',
incremental_strategy='merge',
cluster_by = ['updated_at'],
incremental_predicates = [
"DBT_INTERNAL_SOURCE.updated_at > dateadd(day, -30, current_date)"
],
schema='schema_test',
alias='transactions',
query_tag = 'transactions',
tags=["incremental", "transactions", "unit_testing"]
)
}}
select
*
{% if is_incremental() %}
from {{ref('transactions_view')}}
{% else %}
from {{ref('transactions_all_time_data')}}
{% endif %}
where
id is not null
qualify
RANK() OVER (PARTITION BY id ORDER BY updated_at::TIMESTAMP DESC ) = 1
So after we compile it the SQL will look like this:
merge into base_test.schema_test.transactions as DBT_INTERNAL_DEST
using base_test.schema_test.transactions__dbt_tmp as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.updated_at > dateadd(day, -30, current_date)
)
and (
DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
)
when matched then update set
"ID" = DBT_INTERNAL_SOURCE."ID",
...
"AMOUNT" = DBT_INTERNAL_SOURCE."AMOUNT", ...
when not matched then insert
("ID", ..., "UPDATED_AT")
values
("ID", ..., "UPDATED_AT")
;
Using MERGE to incrementally update datasets is usually considered the best ETL practice. It can be applied both in data warehouse and data lake data platforms. More on MERGE and a good example can be found in my story here [6]:
Testing for side effects
A side effect in Python can be considered as any change that a function makes other than its return value. Data transformation tasks can have multiple side effects. For instance, our data pipeline can be described as a series of jobs which are essentially Python functions. This is a very typical pattern for pipelines built using Apache Airflow. So each task might have various side effects, i.e. sending an email or inserting new values into the database, etc.
Data pipelines are typically defined by a series of ETL tasks performed by a job runner. Job runner can be anything – a container application with a robust API or even a serverless Lambda. In one of my previous stories, I wrote about how to deploy a DBT runner service using AWS Lambda [5].
Database Data Transformation for Data Engineers
That's right. AWS Lambda and dbt!
Why side effects matter?
Consider this simple example below. It explains the nature of the side effects.
# An example with side effect
def adder(data, element):
data.append(element)
return data
list_of_numbers = [1, 2, 3]
print(adder(list_of_numbers, 4)) # Output: [1, 2, 3, 4]
print(list_of_numbers) # Output: [1, 2, 3, 4]
In this function or task, we have a side effect. The function returns some value but it also changes the original list_of_numbers
This is a very simple typical example of Python side effect and coding pattern we should try to avoid.
In unit testing, we can control side effects simply by using unittest.mock
In the example below, when we replace our adder function which produces a side effect with mock
object, it will run our function without a side effect:
from unittest.mock import Mock
# Original function that we will mock
def adder(data, element):
data.append(element)
return data
# Mock object
adder = Mock(side_effect=lambda data, element: data + [element])
# Test with the mock object
list_of_numbers = [1, 2, 3]
print(adder(list_of_numbers, 4)) # Output: [1, 2, 3, 4]
print(list_of_numbers) # Output: [1, 2, 3]
unittest.mock.patch
allows us to control the mock
object, mock some behavior during the test and them automatically remove it after the test is finished:
from unittest.mock import patch
def adder(data, element):
data.append(element)
return data
def test_adder():
with patch('__main__.adder', side_effect=lambda data, element: data + [element]):
list_of_numbers = [1, 2, 3]
print(adder(list_of_numbers, 4)) # Output: [1, 2, 3, 4]
print(list_of_numbers) # Output: [1, 2, 3]
test_adder()
More examples of how to unit test data pipelines using mock.patch
can be found in my story here [7]:
So the best practice is to design our data pipelines in a such way that all the steps they consist of are independent and can be re-run without unintended side effects. This pattern usually helps a lot with recovery. In maths, it's called idempotence. This is when we can run the task multiple times and that wouldn't change the result beyond its initial state.
Building idempotent data pipelines is key
Global variables and Python code optimization
Code scalability and readability depend a lot on the use of the global variables. Typically it can lead to issues with further code change implementations without unintended side effects and makes it more challenging to test it. As a rule of thumb, we should avoid massive use of global variables to ensure our application code is easy to understand and all interactions between different parts of it can be easily explained.
Further Python techniques to improve our data pipeline code can be the following:
- Chunking
- Streaming
- Efficient data loading and lazy evaluation
- Using generators
- Memory optimization
I previously wrote about it here too [7].
Data pipelines must be fixed automatically
Conclusion
In this story, I summarized the techniques and best practice tips to build sustainable and durable data pipelines where data is processed without side effects. If a data pipeline fails, employees most likely will have to perform a set of manual tasks including unnecessary data sourcing, aggregation and processing to get their reporting done. This is often a tedious exercise which requires hours of work and can lead to delays and unintended costs. In this story, I scribbled a few examples of how to improve data processing with incremental strategies, and data testing and tried to define a framework for our robust and durable data pipeline design process. In an ideal world, ETL pipelines must fix themselves. Self-fixing data pipelines exist, we just need to orchestrate a data quality process to ensure data is always available for business stakeholders.
Recommended read
[1] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3
[2] https://towardsdatascience.com/advanced-etl-techniques-for-beginners-03c404f0f0ac
[3] https://towardsdatascience.com/automated-emails-and-data-quality-checks-for-your-data-1de86ed47cf0
[4] https://towardsdatascience.com/building-a-data-warehouse-9696b238b2da
[5] https://towardsdatascience.com/database-data-transformation-for-data-engineers-6404ed8e6000
[6] https://towardsdatascience.com/advanced-sql-techniques-for-beginners-211851a28488
[7] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd