Over the last few months, we’ve launched two long-awaited new features: one that unlocks the future of the DataOps platform infrastructure and the other which uses that feature to dramatically improve the entire Singer ecosystem. These are composable pipelines, implemented via `meltano run`, and the other is Singer-based stream maps, implemented via the mapper plugin in Meltano.
We wanted to give a peek behind the curtain and talk a bit about what it looked like to build
meltano run, add the stream maps feature to
meltano run, and how we’re working towards truly composable pipelines for the infrastructure for the modern data stack.
Getting here has been an iterative process and, like many large feature builds, there were some speed bumps along the way. The ideas behind composable pipelines (issue, issue), and inline stream maps (issue, issue) have actually been floating around for quite a long time in Meltano.
Eventually, we consolidated all the issues into one large Epic to track the build. This epic also serves as a great anthropological development record. One thing that was apparent early on was that we couldn’t jump straight into a feature build, there was some tech debt and house cleaning we had to address first. In the end though, we ended up with a very traditional feature build.
- We cleaned up some tech debt and laid the groundwork
- We had to decide how to splice in the new behavior
- We had to come up with an implementation
- We had to cut it down to a reasonable sized first version
- Then we started iterating on it to make it awesome!
When I joined in the Summer of 2021, only a subset of the Meltano code base related to handling sub-processes and IO where “asyncable” (i.e. where we already used asyncio or where the code was async agnostic). The hook architecture (source), a core component and pattern in Meltano, was not. The hook pattern is how we tell plugins to do things like:
- Trigger before/after configuration tasks – like creating a basic airflow config (source)
- Trigger before/after invocation tasks – like perform a state lookup or setup bookmark writers (source)
- And more! (source search)
If you’re executing a task with many steps and plugins, we don’t want you to have to wait for all the behind-the-scenes work to get completed slowly…sequentially. So one of the first steps on the road to composable pipelines was actually to convert hook’s to play nice with asyncio (issue).
Making the hooks async was a bit infectious because the change to use asyncio had to travel back up the stack to callers – essentially just about everything had to become async or async agnostic.
PluginInvoker.prepared (source) became async because our:
with self.plugin.trigger_hooks("configure", self, session) calls now were async. So, suddenly frequently used context managers like `plugin_invoker.prepared` had to become async context managers. That wasn’t always a big deal as some calling code was already async, but much of the usage of the
prepared context manager for example was via tests, which would also need to change.
Limiting scope and the importance of tests were really the key tenants that we had to observe at this stage. Enough so that we actually reiterated them for reviewers on MR after a discussion:
We’ll limit the scope of the refactor as much as possible to only change behaviors to be async, nothing’s gaining functionality unless it’s required. So most of our code changes will be in test files. Tests will be critical to pick up subtle changes in behavior, so reviews should be extra critical of our test coverage and any changes to tests themselves.
The MR for this change details some of the more nitty-gritty details and also my personal process when I was approaching this as a newbie to the codebase, but, with that work out of the way, we could tackle building
Alter a Feature, Add a Feature?
meltano elt command is focused and designed to work on singular
extract-load and optional
transforms flows. IO was tightly coupled and inflexible (source), and expected an EL pair of Singer plugins (source). Similarly, error handling was complicated but only had to worry about discrete EL pairs (source).
Reworking this to allow arbitrary plugins/commands AND n+1 inline stream maps would have meant a complete rewrite of this code. Code that users and thousands of pipelines already rely on. A large rewrite like this always presents risk and even with extensive tests and QC, bugs could slip into the wild and impact production workloads. Definitely, something we wanted to avoid.
In a traditional SaaS app, we could try to feature flag our changes and release them incrementally, and roll this out slowly in phases to user cohorts. However, being a self-hosted open-source product, that approach is more difficult.
We did want to avoid impacting production workloads though, and since we thought that the ability to chain multiple commands in series presented a new way of thinking about Meltano anyway, we opted to wrap this new feature in a brand-new command called
That would allow us to:
- Iterate on the requirements, adding features over time.
meltano eltas is for the moment, and avoid any risk to existing pipelines.
- Allowed us to develop
meltano runfrom a clean slate. Both behind the scenes, and in its behavior (i.e., it didn’t have to match ELT’s behavior exactly)
It took a bit to arrive at an architecture, and we had our share of bikeshedding (naming things is always hard!). Ultimately, though, the architecture we’ve settled on for the actual build was one of “composable blocks” (issue). The idea is that, like Lego’s – various “simple” block types can interact and be used to construct more complex things. With an early proposal looking something like:
- `Block` a unit of work
| - `SimpleBlock` an indivisible unit of work
| - `CommandBlock` a Meltano plugin's command like `dbt:run`, `dbt:docs` or `dbt:test`
| - `CLIBlock` an arbitrary CLI command (hopefully we don't need this but listing here for completeness)
| - `IOBlock` a consumer or publisher of IO
| - `PublisherBlock` a publisher of IO
| - `ConsumerBlock` a consumer of IO
| - `TranslatorBlock` a consumer and publisher of IO
| - `ComplexBlock` or `BlockSet` a potentially divisible unit of work
| - `ExtractLoadBlock` - an ordered set of `IOBlocks` which each feed from one's STDOUT to the next's STDIN.
With that “block” concept in place, we were off to the races to build an MVP of the implementation that was usable via tests. That would give us an idea of whether the architecture was one that we actually liked in practice and would cover our needs. This was one of those times when we all liked the approach in theory – but a small MVP and some code would let us really know if we were on the right track.
Building a Feature MVP
In practice, we only required a small subset of the “blocks” we came up with as concrete implementations to start with (MR), and so far, those have served us well. Specifically, we implemented these three:
- IOBlock’s (source): are components that produce output (a tap), require Input (a target) or have both Input AND Output (a mapper plugin).
- PluginCommandBlock’s (source): are plugin-based components that expect to be forked/executed in their own process, but don’t strictly require input from other blocks and don’t expect their output to be consumed.
- ExtractLoadBlock’s (source): are a complex block set – made up of any number of properly ordered IOBlock’s (e.g a tap → a mapper → a target)
To support those blocks and our desired behavior, we also gained a few additional key interface implementations and base classes:
- Since we knew inline stream map transforms was coming up next (and big reason for – The
ExtractLoadBlocksrunner implementation was somewhat experimental and forwarded looking. It included scaffolding and stubs to eventually support multiple IOBlock’s between a Producer/Consumer pair. We included those on day 1 to ensure that nothing was dependent on tight “EL” pairings to function.
InvokerBasebase class (source), for creating IOBlock’s built on top of existing Meltano plugins. It implements the low-level details of proxying IO, spawning plugin process, binding stdin/stdout, all the complicated things that
meltano elthad to orchestrate and manage itself.
SingerBlock(source) which wraps singer plugins using an InvokerBase to implement an IOBlock interface – which in turn can be used for complex block sets like ExtractLoadBlock’s.
InvokerCommand(source) wraps singer plugins using an InvokerBase to implement the PluginCommandBlock interface.
Coming from a Go background, I have a bad habit of wanting to make Python look like Go, and so initially actually had an awkward attempt at defining our interface spec using typing.Protocol instead of abstract base classes (MR discussion), but once we settled on using ABC’s we built the first working version of
It’s hard to see how we could have broken up this build further and still made it viable – but we ended up with two large change-sets (MR which merged into MR), and a decent backlog of features we’d need to add later to make
meltano run feature complete (issue). Huge kudos go out to the entire engineering team, helping review these over multiple iterations (no small ask).
In hindsight, though, we should have tried to figure out how to break these up to more manageable and smaller merge-able blocks. At this point, though, we finally had a working version of
meltano run that you could use to run multiple
meltano elt like tasks sequentially.
That is, these two commands:
meltano elt tap-gitlab target-postgres –transform run
meltano elt tap-salesforce target-mysql
Could now be rewritten as a single
meltano run command:
meltano run tap-gitlab target-postgres dbt:run tap-salesforce target-mysql
Mappers, Taking the New Architecture for a Spin
Since the start of this work, the idea was always that Meltano would gain more flexibility by gaining the ability to compose/chain multiple tasks and the ability to alter/manipulate data streams in flight via inline stream maps. In particular, the ability to run inline stream maps was one of the big reasons why we implemented
meltano run rather than refactored
meltano elt. Previously, this feature was only available on taps that were built with the Meltano SDK or via the Pipelinewise implementation.
With the first
meltano run iteration live, it was time to take the new architecture for a spin and see if the ExtractLoadBlock’s could start to support arbitrary IOBlock’s in its execution chain.
Spoiler alert: It could, and now it does! (MR)
ELBExecutionManager (source), the class that actually manages the execution of a ExtractLoadBlock set, contained the bulk of the changes related to supporting more than just pairs of IOBlock’s.
At the core of this class is
ELBExecutionManager._wait_for_process_completion (source), which is effectively a recursive loop calling back to itself until all blocks in a ExtractLoadBlock set have been completed.
It took a few attempts to get to a revision I was happy with and that, I thought, was easy and simple enough for reviews to digest. I didn’t want them to have to resort to a whiteboard and flow chart to figure out what was going on, which is what I had to do in the first version and posted to slack late one night:
The actual stream maps feature itself took a bit longer to bake and ended up giving us a few additional headaches. It actually pushed us to adopt ADR’s, but that’s a whole other blog post.
Come Build the Infrastructure for the Modern Data Stack with Us
meltano run and adding support for mappers, we’ve also started to close the gap with
meltano elt and bring feature parity between the two. One of the big missing pieces was support for incremental jobs (issue) which we shipped in meltano v1.97.0. But there is still lots left to do, both in
meltano run and elsewhere. Personally, I’m really looking forward to when Meltano gets a true plugin architecture, it’s going to make for some really cool
meltano run possibilities!
If this sorts of stuff interests you, come join us, either as a contributor or as a team member!