The Singer specification was a pretty good idea. It makes bringing sources and destinations of data together really easy. Just find a suitable tap for your source data and a suitable target for where you’d like to store it.
The Singer way does come with a certain amount of overhead though since you are effectively passing all data via regular text streams.
When the amount of data is large it can lead to long running pipelines.
This can be the case when you need to perform an initial import of historical data or you are restricted from performing rapid incremental updates because you don’t want to hit your production database during business hours or you simply lack a good replication key to perform incremental updates with.
At Lanefinder we have a couple of special jobs which regularly import collections of large CSV files. The files are delivered at a regular cadence to us by an external actor. Typically each job needs to import a handful of CSV files, each around 500MB in size. One job in particular ends up as a table of around 2 million rows with around 150 columns. Each job will thus process that amount of data.
We developed our own tap-csv
to deal with how the data is structured across the files and to add incremental sync support. If the pipeline fails midway for some reason, it can resume from the file and line number where it was interrupted.
This worked for a while but we ran into issues when we migrated our pipelines to a new Kubernetes cluster with a lot less CPU and memory at our disposal. And then we learned about the new BATCH
message which had been added to the Meltano SDK.
BATCH
messages
BATCH
messages are an extension of the singer spec. You can read more about it inside the Meltano SDK documentation. This is how the message looks:
{ "type": "BATCH", "stream": "stream name goes here", "encoding": { "format": "jsonl", "compression": "gzip" }, "manifest": [ "file://local/path/to/batch/file/1", "s3://remote/path/to/batch/file/2" ] }
Instead of sending individual RECORD
messages over the stream, a list of file paths is sent. The paths are arbitrary and can be local or remote, it is up to the target to handle them. Each file contains records (note: not RECORD
messages, but the raw records within). Both the tap and target need to know how to handle BATCH
messages.
If your stream sends many RECORD
s, this way can be quite a bit faster as you will see.
Developing our own target
To make use of BATCH
messages, both the tap and target needs to be BATCH
aware. We already had our own tap built with the SDK so adding BATCH
support was almost as easy as upgrading the version of the SDK we were building with (we also needed to add config options). But we needed to add BATCH
support to our target.
Up until this point we had been using a modified version of pipelinewise-target-bigquery. That worked well for our purposes but now was the perfect excuse to take more control.
Our new target youcruit-target-bigquery is built with the SDK and supports BATCH
messages. It is also less code because we can leverage the SDK to handle most Singer related bits, and using stream maps and flattening from the SDK instead of having to build that ourselves.
Target side batching
The best way to load data into BigQuery (at least if you consider cost as well) is to use LoadJobs. And in our experience the most dependable way to use LoadJobs is to upload an AVRO file. They are much like a JSONL file, except the first line in the file always defines the schema of the data.
The target does the following when it receives RECORD
messages:
- collect a configurable number of records, by default ten thousand
- converts the records into an AVRO file
- uploads the AVRO file as part of a LoadJob to BigQuery
- waits for the load job to complete
so in effect, the target already does batching. When it receives a BATCH
message it will instead:
- convert each file specified in the message manifest to an AVRO file
- upload all AVRO files as separate LoadJobs to BigQuery
- waits for all jobs to complete
if there is any performance difference it should be down to the overhead in transmitting individual RECORD
messages or single BATCH
messages.
Performance
To demonstrate the performance improvement I am running youcruit-target-bigquery together with tap-infinity which I implemented just for this blog post. Just take my word for it that we have seen a similar improvement with our CSV tap and real CSV files.
This is the meltano configuration used for the test:
version: 1 send_anonymous_usage_stats: false plugins: extractors: - name: tap-infinity namespace: tap_infinity pip_url: git+https://github.com/spacecowboy/tap-infinity.git executable: tap-infinity capabilities: - state - catalog - discover - batch - about - stream-maps - stream-flattening settings: - name: row_count - name: column_count - name: batch_size - name: batch_config - name: stream_maps - name: stream_map_config - name: flattening_enabled - name: flattening_max_depth - name: tap-infinity-batchmsg namespace: tap_infinity pip_url: git+https://github.com/spacecowboy/tap-infinity.git executable: tap-infinity capabilities: - state - catalog - discover - batch - about - stream-maps - stream-flattening settings: - name: row_count - name: column_count - name: batch_size - name: batch_config - name: stream_maps - name: stream_map_config - name: flattening_enabled - name: flattening_max_depth loaders: - name: target-bigquery namespace: bigquery pip_url: git+https://github.com/youcruit/youcruit-target-bigquery.git settings: - name: project_id description: Google project id - name: dataset description: Dataset to load data into - name: location value: US description: Dataset location - name: table_prefix value: '' description: Optional prefix to add to table names - name: max_batch_age value: 5.0 description: Time in minutes between batches - name: batch_size value: 10000 description: Maximum size of batches when records are streamed in. BATCH messages are not affected by this property. - name: add_record_metadata value: true description: Add Singer Data Capture (SDC) metadata to records dialect: bigquery environments: - name: dev config: plugins: extractors: - name: tap-infinity config: row_count: 100000 column_count: 150 - name: tap-infinity-batchmsg config: row_count: 100000 column_count: 150 batch_size: 10000 batch_config: encoding: format: jsonl compression: gzip storage: root: file:// prefix: '' loaders: - name: target-bigquery config: project_id: YOUR_PROJECT_ID_HERE location: US dataset: YOUR_DATASET_HERE table_prefix: ${MELTANO_EXTRACT__LOAD_SCHEMA}_ batch_size: 10000 max_batch_age: 5.0
The project is using the following poetry dependencies:
[tool.poetry.dependencies] python = ">=3.7,<3.11" meltano = "2.7.1"
Measuring is done using the time
command:
100K RECORD
messages
Using the config as above, e.g. 100 000 rows of data with 150 columns, and the following command:
$ time poetry run meltano run tap-infinity target-bigquery real 12m50.760s user 17m35.626s sys 0m11.039s
We get a wall time (real) of 771 seconds. But also note that it used more CPU-time (user) than that due to multi-threading.
10 BATCH
messages 10K records per batch
Still using the same config but switching to BATCH
messages:
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery real 3m58.986s user 1m56.397s sys 0m4.697s
We get 239 seconds wall time. That’s a 3.23x speedup just by reducing the number of messages sent. But it also only used 50% CPU time so most of that time was spent waiting.
Thanks to some handy info prints in the target I can mention that each batch took around 2 seconds to convert to an AVRO file, then it waited around 20 seconds for Google to import the batch into BigQuery. The same is true for the previous since the target does it’s own batching for RECORD
messages.
But there’s more. Because BATCH
messages are faster to process, we can afford to increase the size of the batches where before it would be unsafe to wait that long between incremental state being updated.
1 BATCH
message 100K records per batch
Let’s update the config to the following:
- name: tap-infinity-batchmsg config: row_count: 100000 column_count: 150 batch_size: 100000 batch_config: encoding: format: jsonl compression: gzip storage: root: file:// prefix: ''
so we upload all 100K records in a single batch.
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery real 2m32.084s user 1m54.461s sys 0m4.322s
We get 152 seconds. That’s a 5.07x speedup compared to streaming records. The target took 20 seconds converting this single massive batch to AVRO (scaling linearly as expected) but only 38 seconds importing it into BigQuery.
But what if we increase the amount of data and size of batch message?
1 BATCH
message 1M records per batch
We’re using the following configuration:
- name: tap-infinity-batchmsg config: row_count: 1000000 column_count: 150 batch_size: 1000000 batch_config: encoding: format: jsonl compression: gzip storage: root: file:// prefix: ''
$ time poetry run meltano run tap-infinity-batchmsg target-bigquery real 20m55.127s user 18m14.307s sys 0m22.536s
We get 1255 seconds. That’s a 6.14x speedup compared to streaming records. The target took about 3 minutes converting the batch to AVRO then waited 2 minutes 27 seconds for BigQuery to load the data.
Conclusion
BATCH
messages can be a real performance boost if your pipelines have a lot of data to import. And the Meltano SDK makes it really easy to implement your own tap or target so you can take advantage of it.
The suitable size of the batches can only be determined by you, your data, and your environment. Running locally for an initial data import, setting an initial state for our regular cloud based pipelines, I had no problem running with batch sizes of 10 million. And that made the wait a lot shorter!
Meet the Author
Jonas Kalderstam – Senior Developer (Lanefinder)
A short bio: I have a PhD in machine learning and make things out of code both at work and at home, sometimes to the frustrations of my wife and children. Contributing to Free Software and Open Source is important to me and I have maintained an RSS-reader called Feeder for Android since 2014.