Atdd Data Pipeline Design
Introduction
In this post I want to explore a data engineering framework that would allow to achieve the following goals:
- Acceptance Test Driven Development (ATDD)
- Isolated code changes
- Shared documentation
There are few assumptions that proposed framework is designed for:
- data pipelines that consist of a mix of SQL and code;
- there are multiple sources of data that are initially made available in a single datastore (data lake);
- we are building a target data layer which will be available to the business.
This can be represented by this flow diagram:
application
-> data ingestion | service
-> data lake (nested) | datastore
-> data extraction | batch
-> staging (unnest) | datastore
-> data transformation | batch
-> data warehouse | datastore
Our focus will be on the design of data transformation
in a way that enables
teams to embrace Agile practices.
To make the execution of transformation layer simple, we introduce structured staging area that consists of relational projections of data lake.
Data Landscape
Database
This framework is targeting any Data Warehousing tool and is technology agnostic. It has been implemented in BigQuery at MADE.com.
Data Lake
We use term data lake
to refer to all the data that comes from applications in the raw format. Usually this will be json
files in cloud storage or data streamed as json
into the database.
We expect that data lake consists of tables without nested data.
If you have nested data, you will need to add pre-processing stage to flatten the data. We will refer to flat data layer as data lake going forward.
This separation will allow you to:
- simplify the queries that contain business logical;
- test your queries on the database engines that don’t currently support nested data;
- reduce the size of the queries by additional separation layer;
- move data prep logic like duplicates removal, into pre-processing layer which will benefit all the queries in the data transformation layer.
Work granularity
The most important element of the framework is based on the definition of the unit of work.
We will require that each Story is targeting at least one and only one field in the data warehouse.
This will allow stakeholders and developers to focus on requirements and deliver the most important fields incrementally.
This will also allow developers to organise their code around tables and fields in a way that changes can be integrated quickly.
Capturing requirements
As many other product owners, we found spreadsheet to be the best way to capture requirements for data.
Here is an example of requirements table and corresponding test data:
Mock data for story acceptance
We can provide examples for the mock data that should capture a shared view between users, product owner and developers.
datalake. customers.id | datalake. refunds.id | datalake. refunds.customer_id | datalake. refunds.amount | scenario |
---|---|---|---|---|
1 | 1 | 1 | 50.00 | two refund records for customer 1 |
1 | 2 | 1 | 100.00 | |
2 | no refunds for customer 2 | |||
3 | 4 | 3 | 10.00 | duplicate refund |
3 | 4 | 3 | 10.00 | |
5 | 100 | 20.00 | orphan refund record |
Now we can spec attr_customers_all_refunds
staging output:
customer_id | amount | comment |
---|---|---|
1 | 150.00 | sum of the amount for two records |
3 | 10.00 | duplicate is removed |
100 | 20.00 | orphan record is included |
Requirements captured like this in spreadsheet can be done using formulas which can be used as data lineage documentation.
These two tables can be easily converted into JSON files for mock datalake.refunds
data for further ingestion into the database to test the query:
{
"id": 1,
"customer_id": 1,
"amount": 50.00
},
{
"id": 2,
"customer_id": 1,
"amount": 100.00
},
{
"id": 4,
"customer_id": 3,
"amount": 10.00
},
{
"id": 4,
"customer_id": 3,
"amount": 10.00
},
{
"id": 5,
"customer_id": 100,
"amount": 20.00
}
expected staging output in attr_customers_all_refunds
:
{
"customer_id": 1,
"amount": 150.00
},
{
"customer_id": 3,
"amount": 10.00
},
{
"customer_id": 100,
"amount": 20.00
}
Mock data should be saved as pretty JSON to make it readable and turned into line delimited JSON in code if need.
Now developer can add SQL that conforms with the acceptance above, e.g.:
-- attr_customers_all_refunds
SELECT
customer_id,
SUM(amount) AS amount
FROM
(
SELECT
r.customer_id,
r.amount,
ROW_NUMBER() OVER(PARTITION BY id) rn
FROM
`datalake.refunds` r
)
WHERE
-- this removes duplicate refund records
rn = 1
GROUP BY
-- getting data to the granularity of
-- the relevant primary key
customer_id
Organising things
Code
Simple and logical structure of the code will be driven by either individual fields or data lake tables.
Let’s explore an example for customers table.
We will build target customers
table from orders
and refunds
.
.
└─customers
│ pipeline.R
└───pk
│ fields.R
└───orders
│ │ first.sql
│ │ first.R
│ │ last.sql
│ │ last.R
│ └───tests
│ │ test.R
│ └───data
│ └───first
│ │ orders.json
│ │ customers.json
│ │ products.json
│ └───last
│ orders.json
│ customers.json
│ products.json
└───refunds
│ all.sql
│ all.ref
└───tests
│ test.R
└───data
└───all
refunds.json
customers.json
Each set of fields here will have its own set of test data which will have minimum required set of fields and rows to cover examples from the Acceptance documentation.
This organisation of the code base will allow you to:
- Map your folder structure to your target data
- Write independent SQL queries with unit tests
Staging tables
For each set of fields we will create a staging table with the granularity of the target table.
Here are the tables that will be created for the customers
example:
- attr_customers_pk
- attr_customers_first_sale_order
- attr_customers_last_sale_order
- attr_customers_all_refunds
These tables in the first iteration can be combined with the simple query into the target table:
-- customers.sql
SELECT
pk.id AS id,
first_sale_order.date AS first_sale_order_date,
last_sale_order.amount AS last_sale_order_amount,
refunds.amount AS refund_amount
FROM
attr_customers_pk pk
LEFT JOIN
attr_customers_first_sale_order first_sale_order
ON pk.id = first_sale_order.customer_id
LEFT JOIN
attr_customers_last_sale_order last_sale_order
ON pk.id = last_sale_order.customer_id
LEFT JOIN
attr_customers_all_refunds refunds
ON pk.id = refunds.customer_id
Assembly of the target table can be generalised through SQL builder or process where each set of fields is added through a temporary table.
Image defining target table like this:
# target.yaml
customers:
- table : attr_customers_pk:
query : customers/pk.sql
key : id
fields:
id: id
- table : attr_customers_first_sale_order:
query : customers/sale_order/first/fields.sql
key : customer_id
fields:
date: first_sale_order_date
- table : attr_customers_last_sale_order:
query : customers/sale_order/last/fields.sql
key : customer_id
fields:
amount: refunds_amount
- table : attr_customers_all_refunds:
query : customers/refunds/all/fields.sql
key : customer_id
fields:
amount: refund_amount
and with a line of code you build you target table:
asseble_table("customers/target.yaml")
Orchestration, optimisation and validation are hidden in assemble_table
function. Every time this function is improved, benefits are passed to all
the transformation pipelines that you maintain.
Note that location and names of the staging tables for individual attributes are not important as soon as engine can map them to the corresponding source query. For example, BigQuery stores results of every query execution in a temporary table for 24 hours. Transformation engine can leverage this feature and combine target table from temporary tables with random names.
Managing schema changes
Adding fields
For each new field you will need to assess if you can add it to an existing transformation query without changing granularity or subset of the data.
If that is the case you can extend existing acceptance case and update the query accordingly.
Otherwise, you will need to build a new acceptance document and organise the new code as described above.
Removing fields
We propose approach where fields are never hard removed from the target tables as part of the story development.
Fields can be removed after a period of time once dependencies are cleared from other projects that potentially rely on the the data point in question.
Mostly, field removal will be triggered in situation when quality of data
source is so low, that having no data is better than having garbage
data.
In this case you can just default all the values in a given field to (obsolete)
or other single value that works with your convention.
Name changes
Change of the name should be executed as combination of addition of a field and removal of a field.
For example, lets say you have an amount
field in your table, and you want
to go for a more specific name amount_ex_vat
.
It is possible that some services will have breaking dependency on amount
field.
Resolving all dependencies is impossible within a single release, unless you have a one code base for all projects. Instead, we will update metadata of amount
field so it is obvious that amount_ex_vat
should be used going forward. You can also set and communicate a target date for complete removal of amount
field, at which point dependent projects can take responsibility for any outrage caused by removal of the field.
Continuous integration
Code and test structure can be executed independently which means you can run all the tests in parallel on small datasets which should result in a very fast execution. Commonly used CI tools like Jenkins can help to achieve this.