
6x more speed for your data pipelines with BATCH messages
The Singer specification was a pretty good idea. It makes bringing sources and destinations of data together really easy. Just find a suitable tap for your source data and a suitable target for where you’d like to store it.
The Singer way does come with a certain amount of overhead though since you are effectively passing all data via regular text streams.
When the amount of data is large it can lead to long running pipelines.
This can be the case when you need to perform an initial import of historical data or you are restricted from performing rapid incremental updates because you don’t want to hit your production database during business hours or you simply lack a good replication key to perform incremental updates with.
At Lanefinder we have a couple of special jobs which regularly import collections of large CSV files. The files are delivered at a regular cadence to us by an external actor. Typically each job needs to import a handful of CSV files, each around 500MB in size. One job in particular ends up as a table of around 2 million rows with around 150 columns. Each job will thus process that amount of data.
We developed our own tap-csv
to deal with how the data is structured across the files and to add incremental sync support. If the pipeline fails midway for some reason, it can resume from the file and line number where it was interrupted.
This worked for a while but we ran into issues when we migrated our pipelines to a new Kubernetes cluster with a lot less CPU and memory at our disposal. And then we learned about the new BATCH
message which had been added to the Meltano SDK.
Table of Contents
BATCH
messages
BATCH
messages are an extension of the singer spec. You can read more about it inside the Meltano SDK documentation. This is how the message looks:
{
"type": "BATCH",
"stream": "stream name goes here",
"encoding": {
"format": "jsonl",
"compression": "gzip"
},
"manifest": [
"file://local/path/to/batch/file/1",
"s3://remote/path/to/batch/file/2"
]
}
Instead of sending individual RECORD
messages over the stream, a list of file paths is sent. The paths are arbitrary and can be local or remote, it is up to the target to handle them. Each file contains records (note: not RECORD
messages, but the raw records within). Both the tap and target need to know how to handle BATCH
messages.
If your stream sends many RECORD
s, this way can be quite a bit faster as you will see.
Developing our own target
To make use of BATCH
messages, both the tap and target needs to be BATCH
aware. We already had our own tap built with the SDK so adding BATCH
support was almost as easy as upgrading the version of the SDK we were building with (we also needed to add config options). But we needed to add BATCH
support to our target.
Up until this point we had been using a modified version of pipelinewise-target-bigquery. That worked well for our purposes but now was the perfect excuse to take more control.
Our new target youcruit-target-bigquery is built with the SDK and supports BATCH
messages. It is also less code because we can leverage the SDK to handle most Singer related bits, and using stream maps and flattening from the SDK instead of having to build that ourselves.
Target side batching
The best way to load data into BigQuery (at least if you consider cost as well) is to use LoadJobs. And in our experience the most dependable way to use LoadJobs is to upload an AVRO file. They are much like a JSONL file, except the first line in the file always defines the schema of the data.
The target does the following when it receives RECORD
messages:
- collect a configurable number of records, by default ten thousand
- converts the records into an AVRO file
- uploads the AVRO file as part of a LoadJob to BigQuery
- waits for the load job to complete
so in effect, the target already does batching. When it receives a BATCH
message it will instead:
- convert each file specified in the message manifest to an AVRO file
- upload all AVRO files as separate LoadJobs to BigQuery
- waits for all jobs to complete
if there is any performance difference it should be down to the overhead in transmitting individual RECORD
messages or single BATCH
messages.
Performance
To demonstrate the performance improvement I am running youcruit-target-bigquery together with tap-infinity which I implemented just for this blog post. Just take my word for it that we have seen a similar improvement with our CSV tap and real CSV files.
This is the meltano configuration used for the test:
version: 1
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-infinity
namespace: tap_infinity
pip_url: git+https://github.com/spacecowboy/tap-infinity.git
executable: tap-infinity
capabilities:
- state
- catalog
- discover
- batch
- about
- stream-maps
- stream-flattening
settings:
- name: row_count
- name: column_count
- name: batch_size
- name: batch_config
- name: stream_maps
- name: stream_map_config
- name: flattening_enabled
- name: flattening_max_depth
- name: tap-infinity-batchmsg
namespace: tap_infinity
pip_url: git+https://github.com/spacecowboy/tap-infinity.git
executable: tap-infinity
capabilities:
- state
- catalog
- discover
- batch
- about
- stream-maps
- stream-flattening
settings:
- name: row_count
- name: column_count
- name: batch_size
- name: batch_config
- name: stream_maps
- name: stream_map_config
- name: flattening_enabled
- name: flattening_max_depth
loaders:
- name: target-bigquery
namespace: bigquery
pip_url: git+https://github.com/youcruit/youcruit-target-bigquery.git
settings:
- name: project_id
description: Google project id
- name: dataset
description: Dataset to load data into
- name: location
value: US
description: Dataset location
- name: table_prefix
value: ''
description: Optional prefix to add to table names
- name: max_batch_age
value: 5.0
description: Time in minutes between batches
- name: batch_size
value: 10000
description: Maximum size of batches when records are streamed in. BATCH messages
are not affected by this property.
- name: add_record_metadata
value: true
description: Add Singer Data Capture (SDC) metadata to records
dialect: bigquery
environments:
- name: dev
config:
plugins:
extractors:
- name: tap-infinity
config:
row_count: 100000
column_count: 150
- name: tap-infinity-batchmsg
config:
row_count: 100000
column_count: 150
batch_size: 10000
batch_config:
encoding:
format: jsonl
compression: gzip
storage:
root: file://
prefix: ''
loaders:
- name: target-bigquery
config:
project_id: YOUR_PROJECT_ID_HERE
location: US
dataset: YOUR_DATASET_HERE
table_prefix: ${MELTANO_EXTRACT__LOAD_SCHEMA}_
batch_size: 10000
max_batch_age: 5.0
The project is using the following poetry dependencies:
[tool.poetry.dependencies]
python = ">=3.7,<3.11"
meltano = "2.7.1"
Measuring is done using the time
command:
100K RECORD
messages
Using the config as above, e.g. 100 000 rows of data with 150 columns, and the following command:
$ time poetry run meltano run tap-infinity target-bigquery
real 12m50.760s
user 17m35.626s
sys 0m11.039s
We get a wall time (real) of 771 seconds. But also note that it used more CPU-time (user) than that due to multi-threading.
10 BATCH
messages 10K records per batch
Still using the same config but switching to BATCH
messages:
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
real 3m58.986s
user 1m56.397s
sys 0m4.697s
We get 239 seconds wall time. That’s a 3.23x speedup just by reducing the number of messages sent. But it also only used 50% CPU time so most of that time was spent waiting.
Thanks to some handy info prints in the target I can mention that each batch took around 2 seconds to convert to an AVRO file, then it waited around 20 seconds for Google to import the batch into BigQuery. The same is true for the previous since the target does it’s own batching for RECORD
messages.
But there’s more. Because BATCH
messages are faster to process, we can afford to increase the size of the batches where before it would be unsafe to wait that long between incremental state being updated.
1 BATCH
message 100K records per batch
Let’s update the config to the following:
- name: tap-infinity-batchmsg
config:
row_count: 100000
column_count: 150
batch_size: 100000
batch_config:
encoding:
format: jsonl
compression: gzip
storage:
root: file://
prefix: ''
so we upload all 100K records in a single batch.
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
real 2m32.084s
user 1m54.461s
sys 0m4.322s
We get 152 seconds. That’s a 5.07x speedup compared to streaming records. The target took 20 seconds converting this single massive batch to AVRO (scaling linearly as expected) but only 38 seconds importing it into BigQuery.
But what if we increase the amount of data and size of batch message?
1 BATCH
message 1M records per batch
We’re using the following configuration:
- name: tap-infinity-batchmsg
config:
row_count: 1000000
column_count: 150
batch_size: 1000000
batch_config:
encoding:
format: jsonl
compression: gzip
storage:
root: file://
prefix: ''
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
real 20m55.127s
user 18m14.307s
sys 0m22.536s
We get 1255 seconds. That’s a 6.14x speedup compared to streaming records. The target took about 3 minutes converting the batch to AVRO then waited 2 minutes 27 seconds for BigQuery to load the data.
Conclusion
BATCH
messages can be a real performance boost if your pipelines have a lot of data to import. And the Meltano SDK makes it really easy to implement your own tap or target so you can take advantage of it.
The suitable size of the batches can only be determined by you, your data, and your environment. Running locally for an initial data import, setting an initial state for our regular cloud based pipelines, I had no problem running with batch sizes of 10 million. And that made the wait a lot shorter!
Meet the Author
Jonas Kalderstam – Senior Developer (Lanefinder)
A short bio: I have a PhD in machine learning and make things out of code both at work and at home, sometimes to the frustrations of my wife and children. Contributing to Free Software and Open Source is important to me and I have maintained an RSS-reader called Feeder for Android since 2014.