Motivation

Pinot is a low latency real-time analytics database used for serving online and offline analytic queries at LinkedIn. Unlike other infrastructure systems (e.g. key-value stores), estimating the capacity of a Pinot cluster is challenging as it depends a lot on the query patterns used by the upstream patterns – including the amount of data that is queried by the users as well as the additional aggregations (group-by dimensions) required by the queries.

Pinot deployments are either single tenant  (supporting a single table per broker and server tenant) or multi tenant. We also have hybrid scenarios where brokers are shared across tables, but the server components are dedicated for single tables. Many production use cases are currently single tenant as they have very strict latency SLAs. For efficient usage of resources, the Pinot team is considering supporting multi-tenancy with appropriate isolation of queries.

Currently, when users need to provision a new table or in scenarios where they would like to understand if the Pinot clusters currently allocated for their data can scale to new or additional demands of their service – the Pinot administrators are contacted, and set up a benchmark cluster with data for the use case and sample queries from the users. There are a few issues with this approach:

  • The process is currently manual and hence quite time consuming.
  • The query pattern or the data size for tables changes over time, so estimates provided at the time of onboarding to Pinot are no longer valid, requiring the teams to revisit the capacity numbers.

We have also encountered situations where the need to determine whether a Pinot cluster can scale appropriately or not has lead Pinot users to run tests against live/production clusters. This is either due to the need to be able to exercise the full stack end to end or due to incorrect planning from users. This causes quite some impact for live traffic and underscores the importance of providing a framework that can help users to better manage the tests to support these estimations. 

In the rest of the document we provide details of a framework that will help automate the benchmarking/profiling process.

Basic requirements

Must have

  • End user has to provide a table name, table type(offline, realtime, hybrid), benchmarking mode (either mirroring from an existing use case(mirror mode) or generating a new use case(non-mirror mode)), tag name and table retention time for setting up the use case in perf cluster. For non-mirror mode, user need to provide table schema and table config.
    • Table name
    • Table type (offline, realtime, hybrid)
    • Version id (to differentiate different versions of the same table, alpha-numeric, optional)
    • Benchmarking mode (mirror, non-mirror mode)
    • Tag name (group/user ldap)
    • Table retention time (e.g. 30 days)
    • Table schema, table config (non-mirror mode)
  • End user has to provide data and queries before running benchmarking.
  • End user has to set up a target QPS and duration to run for running benchmarking.
  • Multiple users are able to benchmark their own individual tables concurrently.
  • All the hardware machines are standardized.

Good to have

  • Multiple tables can be put under the same tenant name (multi-tenant use case) without manual intervention. Currently the tenant tag includes the table name, which requires manual intervention if multi-tenant environment is needed to set up in perf cluster.
  • Benchmarking runs can be scheduled. Currently benchmark cannot be run without end users providing queries.

Architecture Design

This service consists of three major parts: cluster management, data preparation and benchmark execution.

Cluster management

Perf use case needs to be set up in perf cluster before running benchmarking. Some essential actions need to be taken, like uploading table schema, modifying table configs, assigning broker & server hosts, managing retention, etc. 

Data preparation

Data needs to be uploaded to perf-test cluster. Before uploading segments, Pinot segment metadata needs to be modified. 

And queries for benchmarking needs to be uploaded and make some adjustment, e.g. appending suffix “Test” to table names in the queries. 

Benchmark Execution

This is the actual part for running benchmarking against the perf cluster. The run can be started only when the data and the queries are ready. After that, end user sets up the target qps, duration, and clicks the “Run benchmarking” button to trigger the tool. During the running, UI shows the count down for users and finally provides the report.

Workflow

The workflow for users to use pinot bench service would be as follows:


  1. Create use case in perf cluster.
    1.   Add/ update schema
    2.   Add/ update table config
  2.     Assign brokers and servers.
    1.   Modify Helix tags for broker & server hosts
  3.     Load data
    1.     Download from Production controller (mirror mode).
    2.      Hadoop Job (Done by users)
    3.     Kafka
  4.     Load queries (Provided by users)
    1.     Existing log
    2.     Upload from users
  5.     Run benchmarking and gather stats
    1.     QPS
    2.     Latency
    3.     # doc scanned
    4.     CPU utilization
    5.     GC
    6.     Memory usage
    7.     # of threads
  6.     Save logs & Clean up
    1.     Save to local
    2.     Delete queries
    3.     Delete table, segments
    4.     Release broker & server hosts
    5.     Delete schema

Service Interaction Diagram

Below is the graph of how Pinot bench service interacts with production cluster and perf cluster. All these three services run in the same colocation. Pinot bench service is the one who sets up perf tables, modifies table settings, and runs benchmarking towards perf cluster. Production cluster is used to provide existing configs and data for tables in mirror mode. Perf cluster is the place where perf tables actual created. Pinot bench service will send queries to perf cluster and receive stats back from perf cluster.

Components

Hosts

In order to run benchmarking, we need to prepare some hosts as candidate brokers and servers. They can be tagged as “pinot-bench-candidate” in Helix tag. When user/ engineer has decided how many brokers and servers needed (from existing number of hosts in Production use case or a new table), the required brokers and servers will be picked from these candidate hosts and then re-tagged to “pinot-bench-{ldap}-{tableName}-{versionId}”. Once the benchmark finishes or the retention time reached, these hosts in use will be recycled back to the candidate list.

Pinot-benchmarking tool and pinot-controller can be deployed to the same single host.

Table config change

The below is the changes needed for use cases in mirror mode. End user can also customize the settings below.

  • Table will rename with Suffix “Test”.
  • Set replication to “1”
  • Set numInstancesPerPartition to the number of hosts specified in the params (offline table if replica group config applied)
  • Set replicasPerPartition to “1” (realtime table)
  • Set Kafka auto offset to “largest”
  • Change tenant tags to “pinot-bench-{ldap}-{tableName}-{versionId}”

Data

Since hybrid tables consist of offline and real-time parts, we’ll elaborate the scenario respectively.

Data for offline table

The table may come from an existing production table, pinot-benchmarking service can have perf benchmarking controller download the existing segments from production controller. 

Data for real-time table

In terms of the real-time data bootstrap, users can specify the Kafka settings and have the use case run for a while, so that real-time segments can be generated in perf-test cluster. Here are the options for users to set up a real-time table.

  • Consume from latest. This is the default setting if nothing specified. Real-time table with such setting will consume the latest Kafka data once the real-time table. It requires users to wait for several days to have the real-time data ready for serving queries.
  • Consume from oldest (or consume from 3 days ago). By doing so, real-time segments can be generated quickly for faster table setup. But this setting makes all the real-time segments have the same number of rows. For some cases where real-time segments is cut by time this setting may not be accurate as the one in production. Plus, this setting requires segment retention manager to kick in so purge out-of-date data every time a new real-time table is set up, which also requires code change in Pinot open source side.
  • (Future work) Get ingestion rate from Pinot server metrics. We fetch the ingestion rate 4 times a day from an existing production real-time table, so that we know the estimated rate and provides a better number for real-time Kafka settings.
  • (Future work) Bootstrap data from production real-time table. This feature requires to copy all the real-time segment metadata and real data from production table. It also requires code changes from Pinot open source side. But the copied data will become useless once there is any Kafka setting needs to be modified and the behavior falls back to Option 1, i.e. waiting for several days for data to be ready.

Query

Queries can be downloaded from production hosts if it’s an existing use case. These queries need to get the table names changed with Suffix “Test” to match the perf table name for querying. 

Users are required to upload their own queries.

Query Execution

Once end user clicks “run benchmarking” button, pinot-bench service will start sending queries to Pinot perf cluster. The service will collect the following stats which show the performance of the queries and the health of the cluster:

  1.     QPS
  2.     Latency
  3.     # doc scanned
  4.     CPU utilization
  5.     GC
  6.     Memory usage
  7.     # of threads 

Once the benchmark tool starts running, the info above needs to be collected from pinot brokers and servers. QPS, latency, and number of documents scanned shows the direct performance of the cluster, whereas CPU, memory, thread usage, and GC shows the health status.

The below is the interactive graph between Pinot benchmark service, Perf pinot broker, and Perf pinot server: 

The stat items from 1 to 3 can be gathered from the response of the queries. The rest items requires new API from Pinot broker and Pinot server that returns the actual stats. Thus, the requests for these stats can be sent in a different load, e.g. 1 request per second.

When benchmark finished, an analytic report will be generated and saved to the cluster. Once the run finishes, an email will be sent to notify the end user that the report is ready. Thus, end user can go the service and call the API (mentioned in API section below) to download the report.

Concurrency

Sometimes there will be more than 1 user using this tool. In order to prevent two different runs affecting each other, there are two options:

  • Pinot benchmark tool will assign different hosts for different use cases.
  • There’s a singleton in benchmarking tool which guarantees there will always be only 1 run happening at any time.

In the first phase of Pinot benchmarking service, we adopt the 1st approach, which is assigning different hosts for different use cases by specifying table name in the tenant name (tenant name would be defined as “pinot-bench-{ldap}-{tableName}-{versionId}”). This means that multi-tenant cluster cannot be set up without manual intervention. We’ll come up with better approach to support setting up multi-tenant automatically in the future.

Clean up

Similar to the segment retention, table retention means how long the test table lasts. Table retention should be stored in table custom config.

Clean up should be done after the test table reaches its retention. There’ll be a background thread which keeps checking the retention of all the test tables periodically. Once it’s satisfied with the cleanup criteria, the following steps will be done:

  • Analytic reports get deleted.
  • All the segments get removed (at least in the 1st phase).
  • Schema, table config are deleted.
  • Broker, server hosts get recycled.

Since there is no concept of table retention in existing Pinot open source code base and there’s no such requirement for now, this clean up check will be done in Pinot benchmark service instead of in perf controller.

Note: the retention of the historical reports can be longer than table retention. Its retention can be constant for all tables.

API Design

In order to handle the whole workflow, pinot-benchmarking tool needs the following APIs to prepare for the benchmarking:

List all the test tables

List out existing tables in pinot-bench cluster.

GET /tables/{tableName}

Create test table

Create a new test table in pinot-bench cluster, and assigns broker and server hosts. If it’s a mirror-mode table, it fetches schema, table config from the production cluster, modifies the existing config. If the table exists, throw an exception to suggest adding a version id.

POST /tables/{tableName}?type=offline/realtime/hybrid&mode=mirrior/non-mirror&broker_host=M&server_host=N(&version_id=K)

Schema changes

Schema changes, such as adding columns, changing data type to multiple value. 

Add schema:

POST /schemas/{tableName}

Update schema:

PUT /schemas/{tableName}

Table config changes

Table config changes, such as tenant tags, replica group, retention, Kafka settings, etc. These will affect the number of hosts assigned.

Add table. It requires users to enter the index settings, segment settings, quota settings, and numbers of brokers & servers.

POST /tables/{tableName}

Update table:

PUT /tables/{tableName}

Load data

Download data from production Controller, modify metadata, and upload to pinot-bench controller(One off).

POST /data/{tableName}

Load data from PBNJ (Cron Job)

Load queries

Load queries. Append suffix “Test” to table names, and store it in pinot-bench host.

Load queries from user.

POST /queries/{tableName}?source=upload

Run benchmarking

Run benchmarking (multiple times) with target QPS and duration. Save stats to generate analytics report.

Trigger benchmark, it returns a report id immediately:

POST /benchmark/{tableName}?qps=M&duration=N

Download report

Once benchmarking finished, download the report based on the reportId from the pinot-bench host:

Download report:

GET /benchmark/{tableName}/report/{reportId}

Clean up

Clean up, including query cleanup, segments / table / schema clean up, hosts recycle. 

POST /cleanup/{tableName}

Rollout plan

Create new module for pinot benchmark as a service.

API implementation:

  • Implement all the designed APIs above.
  • Code change in actual modules, like pinot-broker, pinot-server.

Set up perf cluster and assign several hosts for testing.

End-to-end testing.







  • No labels