Why our E&L processes never break. Tested, versioned and robust ELT

The two types of tests we run inside our data pipelines every day and on every change to ensure great quality of our data.

Introduction

Either you or they, someone will break it. That’s how it always has been right? Data pipelines break, either because you’ve changed something on the code, or because a new and different piece of data arrived you did not account for. 

At Meltano we’re big on data. We’re wrestling with all the data we can get, internal, external, Google Analytics, Slack, telemetry. But we’re a small team, to stay lean and be able to move fast, we’re employing super-robust data pipelines through extensive data testing.

In particular we use two strategies, powered by Meltano, that ensure our pipelines keep on churning out good data. 

Let’s walk through the two most common problems with data pipelines and then see how we at Meltano use Meltano to solve these problems.

Testing should be easy to do, running often, at least at every commit

To not break your data pipelines with code changes, you do need to be able to run tests often. But running tests is a problem in most data ingestion frameworks, because they are cumbersome. Usually you’ll end up with a manual process that involves the data engineer testing against live data.

Meltano is there to make testing easy, both while working on the code, as well as inside a CI/CD pipeline. We’re employing modular code pieces and environments to make this a very smooth process.

Let us take a look at how we’re automating running tests every day.

(1) Prevent new data from breaking your data pipelines

To prevent data changes in a data source to break our data pipelines we’re running “limited data tests”. These tests retrieve just a small amount of data from the sources, not the total amount used in the actual ingestion runs. However they do query the live sources and not some mock/ test data.

If the extraction and loading process works, we know that 

  • no database schema broke,
  • no weird long strings entered our database to break some field
  • there wasn’t a schema change at the source we did not account for. 
  • and much more…

We’re also able to upgrade our extractors to new versions. We utilize the limited data tests again, upgrade our tap on a branch and running the test before merging into the main branch. 

To make this easy to work with, we’re using Meltano environments and environment variables. The following lines works inside our GitHub Actions CI pipelines, set our environment to “cicd” and set an environment variable to “limit the data retrieval to data from 1 day ago”:

env:
  MELTANO_ENVIRONMENT: cicd
  CI_BRANCH: 'b${{ github.SHA }}'
...
- run: echo "DEFAULT_START_DATE=$(date +'%Y-%m-%d' -d '1 day ago')" >> $GITHUB_ENV

We are choosing to retrieve one day of data, but depending on how long your data sources and the speed your tests need, you can choose more or less data.

As you can see we’re also setting a “CI_BRANCH” variable. We’re loading all the test data into our snowflake database. But to not mess with our production data, we’re loading into a separate schema:

environments:
  - name: cicd
    config:
plugins:
  extractors:
    - name: tap-cloudwatch
      config:
        start_date: ${DEFAULT_START_DATE}
    - name: tap-slack
      config:
        start_date: ${DEFAULT_START_DATE}
    - name: tap-slack-public
      config:
        start_date: ${DEFAULT_START_DATE}


[...]
loaders:
  - name: target-snowflake
    config:
      dbname: CICD_RAW
      user: CICD
      role: CICD
      warehouse: CICD
      default_target_schema: ${CI_BRANCH}_${MELTANO_EXTRACT__LOAD_SCHEMA

And that is it for the first part of tests. Using them, we can cover a lot of ground including all infrastructure changes and changes in the incoming data. 

(2) Utilize dbt tests to verify logic changes

When we’re changing logic inside the extract and load processes, like adding columns, renaming, adding new data sources, the above tests are fine to start with, but more important are the downstream dependencies.

At Meltano we use dbt to transform our data, so we’re chaining up our dbt tests behind our ingestion tests. The same story goes for changes in our dbt code, we want to make sure it works with newly ingested data.

After running the “small data ingestion tests” we run all our dbt model tests based on this small dataset.

transform_tests:
[…]

  env:
    MELTANO_ENVIRONMENT: cicd
    MELTANO_SEND_ANONYMOUS_USAGE_STATS: 'false'
    CI_BRANCH: 'b${{ github.SHA }}'

[…]

# Run Test
  - run: meltano install utility dbt-snowflake
  - run: meltano run dbt-snowflake:run dbt-snowflake:test

To make sure all of this happens in isolation, we again use the dedicated Meltano environment “cicd” that pushes all the data into a separate Snowflake schema as shown above.

Why and how to implement tests

Testing doesn’t slow data teams down. Our data team becomes more productive by using tests. 

Inside our squared repository, we simply chain those tests up behind each other, as it’s just ingesting from 10 sources (easily manageable with one data engineer with all these tests), but feel free to mix and match these two types of tests to suit your setup.

If your dbt runs take 1hr+ on a daily data set, but ingestion is quick, it makes sense to separate those two and ingest way less data for the dbt runs.

The whole purpose of having tests is to run them frequently. And you do that first and foremost by making them easy to run and fast to finish.

You can run these kinds of tests with most data ingestion frameworks, but Meltano is built to be used in this way, built to make the data engineers’ life joyful.

If you adopt these testing practices, you’re a whole lot closer to data pipelines that simply do not break, period.

Intrigued?

You haven’t seen nothing yet!