6x more speed for your data pipelines with BATCH messages

The Singer specification was a pretty good idea. It makes bringing sources and destinations of data together really easy. Just find a suitable tap for your source data and a suitable target for where you’d like to store it.

The Singer way does come with a certain amount of overhead though since you are effectively passing all data via regular text streams.
When the amount of data is large it can lead to long running pipelines.

This can be the case when you need to perform an initial import of historical data or you are restricted from performing rapid incremental updates because you don’t want to hit your production database during business hours or you simply lack a good replication key to perform incremental updates with.

At Lanefinder we have a couple of special jobs which regularly import collections of large CSV files. The files are delivered at a regular cadence to us by an external actor. Typically each job needs to import a handful of CSV files, each around 500MB in size. One job in particular ends up as a table of around 2 million rows with around 150 columns. Each job will thus process that amount of data.

We developed our own tap-csv to deal with how the data is structured across the files and to add incremental sync support. If the pipeline fails midway for some reason, it can resume from the file and line number where it was interrupted.

This worked for a while but we ran into issues when we migrated our pipelines to a new Kubernetes cluster with a lot less CPU and memory at our disposal. And then we learned about the new BATCH message which had been added to the Meltano SDK.

BATCH messages

BATCH messages are an extension of the singer spec. You can read more about it inside the Meltano SDK documentation. This is how the message looks:

{
  "type": "BATCH",
  "stream": "stream name goes here",
  "encoding": {
    "format": "jsonl",
    "compression": "gzip"
  },
  "manifest": [
    "file://local/path/to/batch/file/1",
    "s3://remote/path/to/batch/file/2"
  ]
}


Instead of sending individual RECORD messages over the stream, a list of file paths is sent. The paths are arbitrary and can be local or remote, it is up to the target to handle them. Each file contains records (note: not RECORD messages, but the raw records within). Both the tap and target need to know how to handle BATCH messages.

If your stream sends many RECORDs, this way can be quite a bit faster as you will see.

Developing our own target

​To make use of BATCH messages, both the tap and target needs to be BATCH aware. We already had our own tap built with the SDK so adding BATCH support was almost as easy as upgrading the version of the SDK we were building with (we also needed to add config options). But we needed to add BATCH support to our target.

Up until this point we had been using a modified version of pipelinewise-target-bigquery. That worked well for our purposes but now was the perfect excuse to take more control.

Our new target youcruit-target-bigquery is built with the SDK and supports BATCH messages. It is also less code because we can leverage the SDK to handle most Singer related bits, and using stream maps and flattening from the SDK instead of having to build that ourselves.

Target side batching

​The best way to load data into BigQuery (at least if you consider cost as well) is to use LoadJobs. And in our experience the most dependable way to use LoadJobs is to upload an AVRO file. They are much like a JSONL file, except the first line in the file always defines the schema of the data.

The target does the following when it receives RECORD messages:

  • collect a configurable number of records, by default ten thousand
  • converts the records into an AVRO file
  • uploads the AVRO file as part of a LoadJob to BigQuery
  • waits for the load job to complete

so in effect, the target already does batching. When it receives a BATCH message it will instead:

  • convert each file specified in the message manifest to an AVRO file
  • upload all AVRO files as separate LoadJobs to BigQuery
  • waits for all jobs to complete

if there is any performance difference it should be down to the overhead in transmitting individual RECORD messages or single BATCH messages.

Performance

​To demonstrate the performance improvement I am running youcruit-target-bigquery together with tap-infinity which I implemented just for this blog post. Just take my word for it that we have seen a similar improvement with our CSV tap and real CSV files.

This is the meltano configuration used for the test:

version: 1
send_anonymous_usage_stats: false
plugins:
  extractors:
  - name: tap-infinity
    namespace: tap_infinity
    pip_url: git+https://github.com/spacecowboy/tap-infinity.git
    executable: tap-infinity
    capabilities:
    - state
    - catalog
    - discover
    - batch
    - about
    - stream-maps
    - stream-flattening
    settings:
    - name: row_count
    - name: column_count
    - name: batch_size
    - name: batch_config
    - name: stream_maps
    - name: stream_map_config
    - name: flattening_enabled
    - name: flattening_max_depth
  - name: tap-infinity-batchmsg
    namespace: tap_infinity
    pip_url: git+https://github.com/spacecowboy/tap-infinity.git
    executable: tap-infinity
    capabilities:
    - state
    - catalog
    - discover
    - batch
    - about
    - stream-maps
    - stream-flattening
    settings:
    - name: row_count
    - name: column_count
    - name: batch_size
    - name: batch_config
    - name: stream_maps
    - name: stream_map_config
    - name: flattening_enabled
    - name: flattening_max_depth
  loaders:
  - name: target-bigquery
    namespace: bigquery
    pip_url: git+https://github.com/youcruit/youcruit-target-bigquery.git
    settings:
    - name: project_id
      description: Google project id
    - name: dataset
      description: Dataset to load data into
    - name: location
      value: US
      description: Dataset location
    - name: table_prefix
      value: ''
      description: Optional prefix to add to table names
    - name: max_batch_age
      value: 5.0
      description: Time in minutes between batches
    - name: batch_size
      value: 10000
      description: Maximum size of batches when records are streamed in. BATCH messages
        are not affected by this property.
    - name: add_record_metadata
      value: true
      description: Add Singer Data Capture (SDC) metadata to records
    dialect: bigquery
environments:
- name: dev
  config:
    plugins:
      extractors:
      - name: tap-infinity
        config:
          row_count: 100000
          column_count: 150
      - name: tap-infinity-batchmsg
        config:
          row_count: 100000
          column_count: 150
          batch_size: 10000
          batch_config:
            encoding:
              format: jsonl
              compression: gzip
            storage:
              root: file://
              prefix: ''
      loaders:
      - name: target-bigquery
        config:
          project_id: YOUR_PROJECT_ID_HERE
          location: US
          dataset: YOUR_DATASET_HERE
          table_prefix: ${MELTANO_EXTRACT__LOAD_SCHEMA}_
          batch_size: 10000
          max_batch_age: 5.0


The project is using the following poetry dependencies:

[tool.poetry.dependencies]
python = ">=3.7,<3.11"
meltano = "2.7.1"


Measuring is done using the time command:

100K RECORD messages

​Using the config as above, e.g. 100 000 rows of data with 150 columns, and the following command:

$ time poetry run meltano run tap-infinity target-bigquery
​
real    12m50.760s
user    17m35.626s
sys     0m11.039s


We get a wall time (real) of 771 seconds. But also note that it used more CPU-time (user) than that due to multi-threading.

10 BATCH messages 10K records per batch

​Still using the same config but switching to BATCH messages:

$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
​
real    3m58.986s
user    1m56.397s
sys     0m4.697s


We get 239 seconds wall time. That’s a 3.23x speedup just by reducing the number of messages sent. But it also only used 50% CPU time so most of that time was spent waiting.

Thanks to some handy info prints in the target I can mention that each batch took around 2 seconds to convert to an AVRO file, then it waited around 20 seconds for Google to import the batch into BigQuery. The same is true for the previous since the target does it’s own batching for RECORD messages.


But there’s more. Because BATCH messages are faster to process, we can afford to increase the size of the batches where before it would be unsafe to wait that long between incremental state being updated.

1 BATCH message 100K records per batch

​Let’s update the config to the following:

      - name: tap-infinity-batchmsg
        config:
          row_count: 100000
          column_count: 150
          batch_size: 100000
          batch_config:
            encoding:
              format: jsonl
              compression: gzip
            storage:
              root: file://
              prefix: ''


so we upload all 100K records in a single batch.

$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
​
real    2m32.084s
user    1m54.461s
sys     0m4.322s


We get 152 seconds. That’s a 5.07x speedup compared to streaming records. The target took 20 seconds converting this single massive batch to AVRO (scaling linearly as expected) but only 38 seconds importing it into BigQuery.

But what if we increase the amount of data and size of batch message?

1 BATCH message 1M records per batch


We’re using the following configuration:

      - name: tap-infinity-batchmsg
        config:
          row_count: 1000000
          column_count: 150
          batch_size: 1000000
          batch_config:
            encoding:
              format: jsonl
              compression: gzip
            storage:
              root: file://
              prefix: ''

$ time poetry run meltano run tap-infinity-batchmsg target-bigquery
​
real    20m55.127s
user    18m14.307s
sys     0m22.536s


We get 1255 seconds. That’s a 6.14x speedup compared to streaming records. The target took about 3 minutes converting the batch to AVRO then waited 2 minutes 27 seconds for BigQuery to load the data.

Conclusion

BATCH messages can be a real performance boost if your pipelines have a lot of data to import. And the Meltano SDK makes it really easy to implement your own tap or target so you can take advantage of it.

The suitable size of the batches can only be determined by you, your data, and your environment. Running locally for an initial data import, setting an initial state for our regular cloud based pipelines, I had no problem running with batch sizes of 10 million. And that made the wait a lot shorter!

Meet the Author

Jonas Kalderstam – Senior Developer (Lanefinder)

A short bio: I have a PhD in machine learning and make things out of code both at work and at home, sometimes to the frustrations of my wife and children. Contributing to Free Software and Open Source is important to me and I have maintained an RSS-reader called Feeder for Android since 2014.

Intrigued?

You haven’t seen nothing yet!