Introduction

Bolson is a tool that parses JSONs, converts them to records in an Apache Arrow formatted RecordBatch, serializes the batch to an Arrow IPC message, and publishes the message to a Pulsar topic.

Usage

This chapter describes how to use Bolson.

Requirements

To use Bolson, you need the following:

  • A source of JSON data that acts as a TCP server. Bolson operates as long as the connection is alive.
  • An Apache Pulsar broker.
  • An Apache Arrow Schema
    • Unless fixed schema parsing implementations are used, such as specific FPGA implementations.

Arrow Schema

For Bolson to read Arrow Schemas, they need to be serialized to a file using Arrow's built-in schema serialization facility.

An example of how to define and serialize a schema in Python:

import pyarrow

schema = pyarrow.schema([pyarrow.field("field", pyarrow.uint64())])
pyarrow.output_stream("example.as").write(schema.serialize())

JSON data source

If you do not have a JSON data source that can act as a TCP server, it is possible to generate random JSON data using a companion project of Bolson named Illex.

Illex is used throughout this example.

The requirements for a JSON data source acting as a TCP are simple. When Bolson connects, the source can start sending data, without any additional protocol, with the exception that each JSON object must be terminated by the newline character '\n'.

Pulsar broker

If you do not have a Pulsar broker, it can easily be spawned locally using Docker:

docker run -it --rm -p 6650:6650 -p 8080:8080 apachepulsar/pulsar bin/pulsar standalone

Subcommands

Bolson knows two subcommands, stream and bench.

  • Stream: Convert JSONs and publish them to Pulsar in a streaming fashion.
  • Bench: Run micro-benchmarks of specific components of Bolson.

Stream

Produce Pulsar messages from a JSON TCP stream.
Usage: bolson stream [OPTIONS] [input]

Positionals:
  input TEXT:FILE                                 Serialized Arrow schema file for records to convert to.

Options:
  -h,--help                                       Print this help message and exit
  --latency TEXT                                  Enable batch latency measurements and write to supplied file.
  --metrics TEXT                                  Write metrics to supplied file.
  --max-rows UINT=1024                            Maximum number of rows per RecordBatch.
  --max-ipc UINT=5232640                          Maximum size of IPC messages in bytes.
  --threads UINT=1                                Number of threads to use for conversion.
  -p,--parser ENUM:value in {arrow->0,opae-battery->1,opae-trip->2} OR {0,1,2}=0
                                                  Parser implementation. OPAE parsers have fixed schema and ignore schema supplied to -i.
  -i,--input TEXT:FILE                            Serialized Arrow schema file for records to convert to.
  --arrow-buf-cap UINT=16777216                   Arrow input buffer capacity.
  --arrow-seq-col=0                               Arrow parser, retain ordering information by adding a sequence number column.
  --battery-afu-id TEXT                           OPAE "battery status" AFU ID. If not supplied, it is derived from number of parser instances.
  --battery-num-parsers UINT=8                    OPAE "battery status" number of parser instances.
  --battery-seq-col=0                             OPAE "battery status" parser, retain ordering information by adding a sequence number column.
  --trip-afu-id TEXT                              OPAE "trip report" AFU ID. If not supplied, it is derived from number of parser instances.
  --trip-num-parsers UINT=4                       OPAE "trip report" number of parser instances.
  -u,--pulsar-url TEXT=pulsar://localhost:6650/   Pulsar broker service URL.
  -t,--pulsar-topic TEXT=non-persistent://public/default/bolson
                                                  Pulsar topic.
  --pulsar-max-msg-size UINT=5232640
  --pulsar-producers UINT=1                       Number of concurrent Pulsar producers.
  --pulsar-batch                                  Enable batching Pulsar producer(s).
  --pulsar-batch-max-messages UINT=1000           Pulsar batching max. messages.
  --pulsar-batch-max-bytes UINT=131072            Pulsar batching max. bytes.
  --pulsar-batch-max-delay UINT=10                Pulsar batching max. delay (ms).
  --host TEXT=localhost                           JSON source TCP server hostname.
  --port UINT=10197                               JSON source TCP server port.

Bench

Run micro-benchmarks on isolated pipeline stages.
Usage: bolson bench [OPTIONS] SUBCOMMAND

Options:
  -h,--help                                       Print this help message and exit

Subcommands:
  client                                          Run TCP client interface microbenchmark.
  convert                                         Run JSON to Arrow IPC convert microbenchmark.
  queue                                           Run queue microbenchmark.
  pulsar                                          Run Pulsar publishing microbenchmark.

Stream mode

Micro-benchmarks

Parsing

Pulsar producer

FPGA accelerated parsing

By default, Bolson parses and converts JSONs using Arrow's built-in JSON parser. Bolson can also run with FPGA-accelerated parsing enabled for specific, hard coded schemas.

Fletcher/Intel OPAE

Prerequisites

  • A system set up according to the setup instructions for Fletcher OPAE.
  • The correct bitstream for a specific Arrow schema & parser implementation.

Flash the bitstream

Make sure to first flash the bitstream. From the Fletcher OPAE guide:

Start the FPGA development environment container for the Intel Acceleration Stack (IAS).

cd path/to/bitstream
docker run -it --rm --privileged -v `pwd`:/src:ro ias:1.2.1

From the IAS container, program the FPGA with the bitstream and exit the container:

fpgaconf bitstream.gbs
exit

Enable huge pages

The current implementation of Fletcher OPAE based accelerators requires huge pages to be enabled.

On a CentOS system, they can be enabled by root users as follows:

sudo su
echo 32 | tee /sys/kernel/mm/hugepages/hugepages-1048576kB/nr_hugepages
exit

Run Bolson with FPGA-accelerated parser implementation

Run Bolson with the -p or --parser option followed by the name of the implementation of to select which FPGA-accelerated parser implementation to use for the respective subcommand (stream or bench convert).

To see which implementations are available, run Bolson with <subcommand> --help.

Example:

bolson bench convert path/to/schema.as -p opae-battery

Design

This chapter describes the high-level architecture of Bolson.

Overview

Bolson consists of three major components:

  • Client
    • TCP client, connects to a data source acting as a TCP server. Places received TCP packets in buffers for conversion.
  • Converter
    • Parses JSON objects, converts them to Arrow RecordBatches, and serializes the RecordBatches as Arrow IPC messages. These IPC messages are pushed into a queue.
  • Publisher
    • Takes IPC messages from the queue and publishes them to a Pulsar topic.

An overview of the architecture of Bolson is shown below:

Client

The Client connects to a server through a TCP socket. As long as the connection exists, Bolson continues to operate. Upon the arrival of a TPDU, it's payload is stored in one of the B TCP buffers, whichever next buffer is not locked and empty in a round-robin fashion. If no buffers are available, no TPDUs will be received until a buffer becomes available.

Whenever a buffer is filled with data, before unlocking the buffer for conversion, the client reverse scans the buffer for the newline character that must proceed every JSON object. Any leftover bytes and characters in the buffer will be carried over to the next buffer, such that a buffer always contains a discrete number of JSONs. This assumes that the JSON data source will always place a newline character '\n' behind every JSON.

Converter

The converter takes the contents of a TCP buffer, and parses the JSONs contained within. JSONs are expected to be separated by whitespace, i.e. at the end of each JSON, there must be a whitespace character '\n'. The JSON data is converted to an Arrow RecordBatch (simply called batch from here on). This batch is serialized as an Arrow IPC message, and pushed onto a concurrent queue.

Converters may be implemented as developers see fit, but the baseline software converter is a concurrent converter that can use multiple C threads to convert the data contained in the TCP buffers.

An overview of a converter thread is as follows:

  • Parse
    • To parse JSONs, Bolson uses the Apache Arrow JSON parsing functionality, which, at the time of writing, uses RapidJSON under the hood.
    • Bolson also currently knows two FPGA-accelerated parser implementations that are described in following sections.
  • Resize
    • Because the size of a serialized batch can exceed the maximum size of a Pulsar message, it is necessary for converters to resize batches if they exceed the user-defined limit of a number of rows or a number of bytes. This is a zero-copy operation.
  • Serialize
    • This step serialized the resized batches to Arrow IPC messages and pushes the IPC messages into the concurrent queue.

Converter threads

The converter setup discussed so far converts the JSONs in one buffer to one Arrow RecordBatch, which is then potentially split up over K messages. The internal thread that manages this type of conversion is called the one-to-one converter thread.

Another setup is possible, where a converter thread can take data from all input buffers at the same time, and convert that to one RecordBatch before resizing. This thread is internally called the All-To-One converter thread and is spawned when a parser implementation provides multiple input buffers but notifies the converter that it should only spawn one thread.

The all-to-one approach is currently only used for an FPGA implementation where the circuitry from the hardware parsers to the host memory is relatively large in terms of area, but provides much higher throughput than a single parser can deliver, and it is therefore shared with multiple other hardware parsers. This way, the data for each parser working on each input buffer ends up in the same RecordBatch, hence it becomes an "all-to-one" parser implementation requiring an all-to-one converter thread.

Publisher

The publisher component pops Arrow IPC messages from the concurrent queue and publishes these Arrow IPC messages to a Pulsar topic. The implementation uses the Pulsar C++ client library.

To increase the throughput, it is possible to spawn P Pulsar producer threads to publish IPC messages from the queue concurrently.

If IPC messages are small, but many of them arrive every second, it is possible to reduce the overhead having the Pulsar producers apply batching. Note well that this always comes at the cost of increased latency.

Detailed documentation / Doxygen

Detailed documentation of the sources can be generated by running Doxygen from the root of the repository. The output can then be accessed by opening: doc/html/index.html

Design

This chapter describes the high-level architecture of Bolson.

Overview

Bolson consists of three major components:

  • Client
    • TCP client, connects to a data source acting as a TCP server. Places received TCP packets in buffers for conversion.
  • Converter
    • Parses JSON objects, converts them to Arrow RecordBatches, and serializes the RecordBatches as Arrow IPC messages. These IPC messages are pushed into a queue.
  • Publisher
    • Takes IPC messages from the queue and publishes them to a Pulsar topic.

An overview of the architecture of Bolson is shown below:

Client

The Client connects to a server through a TCP socket. As long as the connection exists, Bolson continues to operate. Upon the arrival of a TPDU, it's payload is stored in one of the B TCP buffers, whichever next buffer is not locked and empty in a round-robin fashion. If no buffers are available, no TPDUs will be received until a buffer becomes available.

Whenever a buffer is filled with data, before unlocking the buffer for conversion, the client reverse scans the buffer for the newline character that must proceed every JSON object. Any leftover bytes and characters in the buffer will be carried over to the next buffer, such that a buffer always contains a discrete number of JSONs. This assumes that the JSON data source will always place a newline character '\n' behind every JSON.

Converter

The converter takes the contents of a TCP buffer, and parses the JSONs contained within. JSONs are expected to be separated by whitespace, i.e. at the end of each JSON, there must be a whitespace character '\n'. The JSON data is converted to an Arrow RecordBatch (simply called batch from here on). This batch is serialized as an Arrow IPC message, and pushed onto a concurrent queue.

Converters may be implemented as developers see fit, but the baseline software converter is a concurrent converter that can use multiple C threads to convert the data contained in the TCP buffers.

An overview of a converter thread is as follows:

  • Parse
    • To parse JSONs, Bolson uses the Apache Arrow JSON parsing functionality, which, at the time of writing, uses RapidJSON under the hood.
    • Bolson also currently knows two FPGA-accelerated parser implementations that are described in following sections.
  • Resize
    • Because the size of a serialized batch can exceed the maximum size of a Pulsar message, it is necessary for converters to resize batches if they exceed the user-defined limit of a number of rows or a number of bytes. This is a zero-copy operation.
  • Serialize
    • This step serialized the resized batches to Arrow IPC messages and pushes the IPC messages into the concurrent queue.

Converter threads

The converter setup discussed so far converts the JSONs in one buffer to one Arrow RecordBatch, which is then potentially split up over K messages. The internal thread that manages this type of conversion is called the one-to-one converter thread.

Another setup is possible, where a converter thread can take data from all input buffers at the same time, and convert that to one RecordBatch before resizing. This thread is internally called the All-To-One converter thread and is spawned when a parser implementation provides multiple input buffers but notifies the converter that it should only spawn one thread.

The all-to-one approach is currently only used for an FPGA implementation where the circuitry from the hardware parsers to the host memory is relatively large in terms of area, but provides much higher throughput than a single parser can deliver, and it is therefore shared with multiple other hardware parsers. This way, the data for each parser working on each input buffer ends up in the same RecordBatch, hence it becomes an "all-to-one" parser implementation requiring an all-to-one converter thread.

Publisher

The publisher component pops Arrow IPC messages from the concurrent queue and publishes these Arrow IPC messages to a Pulsar topic. The implementation uses the Pulsar C++ client library.

To increase the throughput, it is possible to spawn P Pulsar producer threads to publish IPC messages from the queue concurrently.

If IPC messages are small, but many of them arrive every second, it is possible to reduce the overhead having the Pulsar producers apply batching. Note well that this always comes at the cost of increased latency.

Detailed documentation / Doxygen

Detailed documentation of the sources can be generated by running Doxygen from the root of the repository. The output can then be accessed by opening: doc/html/index.html

Hardware accelerated parsing

Battery status

Trip report