Usage

This chapter describes how to use Bolson.

Requirements

To use Bolson, you need the following:

  • A source of JSON data that acts as a TCP server. Bolson operates as long as the connection is alive.
  • An Apache Pulsar broker.
  • An Apache Arrow Schema
    • Unless fixed schema parsing implementations are used, such as specific FPGA implementations.

Arrow Schema

For Bolson to read Arrow Schemas, they need to be serialized to a file using Arrow's built-in schema serialization facility.

An example of how to define and serialize a schema in Python:

import pyarrow

schema = pyarrow.schema([pyarrow.field("field", pyarrow.uint64())])
pyarrow.output_stream("example.as").write(schema.serialize())

JSON data source

If you do not have a JSON data source that can act as a TCP server, it is possible to generate random JSON data using a companion project of Bolson named Illex.

Illex is used throughout this example.

The requirements for a JSON data source acting as a TCP are simple. When Bolson connects, the source can start sending data, without any additional protocol, with the exception that each JSON object must be terminated by the newline character '\n'.

Pulsar broker

If you do not have a Pulsar broker, it can easily be spawned locally using Docker:

docker run -it --rm -p 6650:6650 -p 8080:8080 apachepulsar/pulsar bin/pulsar standalone

Subcommands

Bolson knows two subcommands, stream and bench.

  • Stream: Convert JSONs and publish them to Pulsar in a streaming fashion.
  • Bench: Run micro-benchmarks of specific components of Bolson.

Stream

Produce Pulsar messages from a JSON TCP stream.
Usage: bolson stream [OPTIONS] [input]

Positionals:
  input TEXT:FILE                                 Serialized Arrow schema file for records to convert to.

Options:
  -h,--help                                       Print this help message and exit
  --latency TEXT                                  Enable batch latency measurements and write to supplied file.
  --metrics TEXT                                  Write metrics to supplied file.
  --max-rows UINT=1024                            Maximum number of rows per RecordBatch.
  --max-ipc UINT=5232640                          Maximum size of IPC messages in bytes.
  --threads UINT=1                                Number of threads to use for conversion.
  -p,--parser ENUM:value in {arrow->0,opae-battery->1,opae-trip->2} OR {0,1,2}=0
                                                  Parser implementation. OPAE parsers have fixed schema and ignore schema supplied to -i.
  -i,--input TEXT:FILE                            Serialized Arrow schema file for records to convert to.
  --arrow-buf-cap UINT=16777216                   Arrow input buffer capacity.
  --arrow-seq-col=0                               Arrow parser, retain ordering information by adding a sequence number column.
  --battery-afu-id TEXT                           OPAE "battery status" AFU ID. If not supplied, it is derived from number of parser instances.
  --battery-num-parsers UINT=8                    OPAE "battery status" number of parser instances.
  --battery-seq-col=0                             OPAE "battery status" parser, retain ordering information by adding a sequence number column.
  --trip-afu-id TEXT                              OPAE "trip report" AFU ID. If not supplied, it is derived from number of parser instances.
  --trip-num-parsers UINT=4                       OPAE "trip report" number of parser instances.
  -u,--pulsar-url TEXT=pulsar://localhost:6650/   Pulsar broker service URL.
  -t,--pulsar-topic TEXT=non-persistent://public/default/bolson
                                                  Pulsar topic.
  --pulsar-max-msg-size UINT=5232640
  --pulsar-producers UINT=1                       Number of concurrent Pulsar producers.
  --pulsar-batch                                  Enable batching Pulsar producer(s).
  --pulsar-batch-max-messages UINT=1000           Pulsar batching max. messages.
  --pulsar-batch-max-bytes UINT=131072            Pulsar batching max. bytes.
  --pulsar-batch-max-delay UINT=10                Pulsar batching max. delay (ms).
  --host TEXT=localhost                           JSON source TCP server hostname.
  --port UINT=10197                               JSON source TCP server port.

Bench

Run micro-benchmarks on isolated pipeline stages.
Usage: bolson bench [OPTIONS] SUBCOMMAND

Options:
  -h,--help                                       Print this help message and exit

Subcommands:
  client                                          Run TCP client interface microbenchmark.
  convert                                         Run JSON to Arrow IPC convert microbenchmark.
  queue                                           Run queue microbenchmark.
  pulsar                                          Run Pulsar publishing microbenchmark.