Child pages
  • Druid Integration
Skip to end of metadata
Go to start of metadata

 

Version information

Druid integration is introduced in Hive 2.2.0 (HIVE-14217). It is compatible with Druid 0.9.1.1, the latest stable release of Druid to date.

Introduction

This page documents the work done for the integration between Druid and Hive, which was started in HIVE-14217.

Objectives

Our main goal is to be able to index data from Hive into Druid, and to be able to query Druid datasources from Hive. Completing this work will bring benefits to the Druid and Hive systems alike:

  • Efficient execution of OLAP queries in Hive. Druid is a system specially well tailored towards the execution of OLAP queries on event data. Hive will be able to take advantage of its efficiency for the execution of this type of queries.
  • Introducing a SQL interface on top of Druid. Druid queries are expressed in JSON, and Druid is queried through a REST API over HTTP. Once a user has declared a Hive table that is stored in Druid, we will be able to transparently generate Druid JSON queries from the input Hive SQL queries.
  • Being able to execute complex operations on Druid data. There are multiple operations that Druid does not support natively yet, e.g. joins. Putting Hive on top of Druid will enable the execution of more complex queries on Druid data sources. 
  • Indexing complex query results in Druid using Hive. Currently, indexing in Druid is usually done through MapReduce jobs. We will enable Hive to index the results of a given query directly into Druid, e.g., as a new table or a materialized view (HIVE-10459), and start querying and using that dataset immediately.

The initial implementation, started in HIVE-14217, focuses on 1) enabling the discovery of data that is already stored in Druid from Hive, and 2) being able to query that data, trying to make use of Druid advanced querying capabilities. For instance, we have put special emphasis on pushing as much computation as possible to Druid, and being able to recognize the type of queries for which Druid is specially efficient, e.g. timeseries or topN queries.

Future work after the first step is completed is being listed in HIVE-14473. It includes, among others, the possibility to use Create Table As Select (CTAS) statements to create datasources in Druid from Hive (HIVE-14474). If you want to collaborate on this effort, a list of remaining issues can be found at the end of this document.

Preliminaries

Before going into further detail, we introduce some background that the reader needs to be aware of in order to understand this document.

Druid

Druid is an open-source analytics data store designed for business intelligence (OLAP) queries on event data. Druid provides low latency (real-time) data ingestion, flexible data exploration, and fast data aggregation. Existing Druid deployments have scaled to trillions of events and petabytes of data. Druid is most commonly used to power user-facing analytic applications. You can find more information about Druid here

Storage Handlers

You can find an overview of Hive Storage Handlers here; the integration of Druid with Hive depends upon that framework.

Usage

For the running examples, we use the wikiticker dataset included in the quickstart tutorial of Druid.

Create tables linked to Druid datasources

Assume that we have already indexed the wikiticker dataset mentioned previously, and the address of the Druid broker is 10.5.0.10:8082.

First, you need to set the Hive property hive.druid.broker.address.default in your configuration to point to the broker address:

Then, to create a table that we can query from Hive, we execute the following statement in Hive:

Observe that you need to specify the datasource as TBLPROPERTIES using the druid.datasource property. Further, observe that the table needs to be created as EXTERNAL, as data is stored in Druid. The table is just a logical entity that we will use to express our queries, but there is no data movement when we create the table. In fact, what happened under the hood when you execute that statement, is that Hive sends a segment metadata query to Druid in order to discover the schema (columns and their types) of the data source. Retrieval of other information that might be useful such as statistics e.g. number of rows, is in our roadmap, but it is not supported yet. Finally, note that if we change the Hive property value for the default broker address, queries on this table will automatically run against the new broker address, as the address is not stored with the table.

If we execute a DESCRIBE statement, we can actually see the information about the table:

We can see there are three different groups of columns corresponding to the Druid categories: the timestamp column (__time) mandatory in Druid, the dimension columns (whose type is STRING), and the metrics columns (all the rest).

Querying Druid from Hive

Once we have created our first table stored in Druid using the DruidStorageHandler, we are ready to execute our queries against Druid.

When we express a query over a Druid table, Hive tries to rewrite the query to be executed efficiently by pushing as much computation as possible to Druid. This task is accomplished by the cost optimizer based in Apache Calcite, which identifies patterns in the plan and apply rules to rewrite the input query into a new equivalent query with (hopefully) more operations executed in Druid.

In particular, we implemented our extension to the optimizer in HIVE-14217, which builds upon the work initiated in CALCITE-1121, and extends its logic to identify more complex query patterns (timeseries and topN queries), translate filters on the time dimension to Druid intervals, push limit into Druid select queries, etc.

Currently, we support the recognition of timeseriestopNgroupBy, and select queries.

Once we have completed the optimization, the (sub)plan of operators that needs to be executed by Druid is translated into a valid Druid JSON query, and passed as a property to the Hive physical TableScan operator. The Druid query will be executed within the TableScan operator, which will generate the records out of the Druid query results.

We generate a single Hive split with the corresponding Druid query for timeseries, topN, and groupBy, from which we generate the records. Thus, the degree of parallelism is 1 in these cases. However, for simple select queries without limit (although they might still contain filters or projections), we partition the original query into x queries and generate one split for each of them, thus incrementing the degree of parallelism for these queries, which usually return a large number of results, to x. We explain more details later on. 

 

Consider that depending on the query, it might not be possible to push any computation to Druid. However, our contract is that the query should always be executed. Thus, in those cases, Hive will send a select query to Druid, which basically will read all the segments from Druid, generate records, and then execute the rest of Hive operations on those records. This is also the approach that will be followed if the cost optimizer is disabled (not recommended).

Queries completely executed in Druid

We focus first on queries that can be pushed completely into Druid. In these cases, we end up with a simple plan consisting of a TableScan and a Fetch operator on top. Thus, there is no overhead related to launching containers for the execution.

Select queries

We start with the simplest type of Druid query: select queries. Basically, a select query will be equivalent to a scan operation on the data sources, although operations such as projection, filter, or limit can still be pushed into this type of query.

Consider the following query, a simple select query for 10 rows consisting of all the columns of the table:

The Hive plan for the query will be the following:

Observe that the Druid query is in the properties attached to the TableScan. For readability, we format it properly:

Observe that we get to push the limit into the Druid query (threshold). Observe as well that as we do not specify a filter on the timestamp dimension for the data source, we generate an interval that covers the range (−∞,+∞).

 

In Druid, the timestamp column plays a central role. In fact, Druid allows to filter on the time dimension using the intervals property for all those queries. This is very important, as the time intervals determine the nodes that store the Druid data. Thus, specifying a precise range minimizes the number of nodes hit by the broken for a certain query. Inspired by Druid PR-2880, we implemented the intervals extraction from the filter conditions in the logical plan of a query. For instance, consider the following query:

The Druid query generated for the SQL query above is the following (we omit the plan, as it is a simple TableScan operator).

Observe that we infer correctly the interval for the specified dates, 2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.001Z, because in Druid the starting date of the interval is included, but the closing date is not. We also support recognition of multiple interval ranges, for instance in the following SQL query:

Furthermore we can infer overlapping intervals too. Finally, the filters that are not specified on the time dimension will be translated into valid Druid filters and included within the query using the filter property.

Partitioning select queries

We can partition Druid select queries that return large results into multiple subqueries that are executed in parallel against Druid. The parallelization depends on the value for the hive.druid.select.threshold configuration parameter.

In particular, we take the number of rows of the result obtained using a segment metadata query. The number of splits for the select query is : number of rows /  hive.druid.select.threshold splits. We split the query along the time dimension, assuming that the records distribution across time is uniform (we plan to extend this logic in the future). Thus, we consider the time boundaries in the query in order to know how to split the query; if the query is not time bounded, we submit a time boundary query to Druid to obtain them.

Timeseries queries

Timeseries is one of the types of queries that Druid can execute very efficiently. The following SQL query translates directly into a Druid timeseries query:

Basically, we group by a given time granularity and calculate the aggregation results for each resulting group. In particular, the floor_month function over the timestamp dimension __time represents the Druid month granularity format. Currently, we support floor_year, floor_quarter, floor_month, floor_week, floor_day, floor_hour, floor_minute, and floor_second granularities. In addition, we support two special types of granularities, all and none, which we describe below. We plan to extend our integration work to support other important Druid custom granularity constructs, such as duration and period granularities.

The Hive plan for the query will be the following:

Observe that the Druid query is in the properties attached to the TableScan. For readability, we format it properly:

Observe that the granularity for the Druid query is MONTH.

 

Two rather special cases are all and none granularities, which we introduce by example below. Consider the following query:

As it will do an aggregation on the complete dataset, it translates into a timeseries query with granularity all. In particular, the equivalent Druid query attached to the TableScan operator is the following:

 

In turn, given the following query:

It translates into a timeseries query with granularity none, as it only groups events that happened exactly at the same time. The JSON query is as follows: 

TopN queries

TopN is the third type of queries we support. These queries return a sorted set of results for the values in a single dimension according to some criteria. For this case, topN queries are much faster and resource efficient than groupBy queries, which we introduce below. The semantics of topN queries in SQL is as follows:

Basically, the query is asking for the top 10 maximum values of delta for each channel with a monthly granularity. It also asks for the sum on another column. The generated equivalent Druid JSON query is the following:

Observe that we need to use the metric field to specify the metric on which we would like to execute the top operation. Finally, we show the results for the query:

GroupBy queries

The final type of queries we currently support is groupBy. This kind of query is more expressive than timeseries and topN queries; however, they are less performant. Thus, we only fall back to groupBy queries when we cannot transform into timeseries or topN queries.

For instance, the following SQL query will generate a Druid groupBy query:

Queries across Druid and Hive

Finally, we provide an example of a query that runs across Druid and Hive. In particular, let us create a second table in Hive with some data:

Assume we want to execute the following query:

The query is a simple join on columns channel and col2. The subquery a is executed completely in Druid as a topN query. Then the results are joined in Hive with the results of results of subquery b. The query plan and execution in Tez is shown in the following:

 

Open Issues (JIRA)

Loading
Key Summary T Created Updated Due Assignee Reporter P Status Resolution
HIVE-14473 Druid integration II New Feature Aug 08, 2016 Jan 10, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Major Open Unresolved
HIVE-14518 Support 'having' translation for Druid GroupBy queries Sub-task Aug 11, 2016 Aug 11, 2016 Unassigned Jesus Camacho Rodriguez Major Open Unresolved
HIVE-14543 Create Druid table without specifying data source Sub-task Aug 16, 2016 Oct 21, 2016 Unassigned Jesus Camacho Rodriguez Major Open Unresolved
HIVE-14587 Support for average post-aggregation in Druid Sub-task Aug 19, 2016 Aug 19, 2016 Unassigned Jesus Camacho Rodriguez Major Open Unresolved
HIVE-14597 Support for Druid custom granularities Sub-task Aug 22, 2016 Aug 22, 2016 Unassigned Jesus Camacho Rodriguez Major Open Unresolved
HIVE-14722 Support creating vector row batches from Druid Sub-task Sep 08, 2016 Sep 08, 2016 Unassigned Jesus Camacho Rodriguez Major Open Unresolved
HIVE-15571 Support Insert into for druid storage handler New Feature Jan 10, 2017 Jan 10, 2017 slim bouguerra slim bouguerra Major Open Unresolved
HIVE-15584 Early bail out when we use CTAS and Druid source already exists Sub-task Jan 11, 2017 Mar 06, 2017 slim bouguerra Jesus Camacho Rodriguez Minor Open Unresolved
HIVE-15619 Column pruner should handle DruidQuery Sub-task Jan 13, 2017 Mar 09, 2017 Nishant Bangarwa Jesus Camacho Rodriguez Major Patch Available Unresolved
HIVE-15632 Hive/Druid integration: Incorrect result - Limit on timestamp disappears Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15634 Hive/Druid integration: Timestamp column inconsistent w/o Fetch optimization Bug Jan 16, 2017 Mar 25, 2017 slim bouguerra Jesus Camacho Rodriguez Critical In Progress Unresolved
HIVE-15635 Hive/Druid integration: timeseries query shows all days, even if no data Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15636 Hive/Druid integration: wrong semantics of topN query limit with granularity Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15637 Hive/Druid integration: wrong semantics of groupBy query limit with granularity Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15639 Hive/Druid integration: wrong semantics for ordering within groupBy queries Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15640 Hive/Druid integration: null handling for metrics Bug Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Critical Open Unresolved
HIVE-15641 Hive/Druid integration: filter on timestamp not pushed to DruidQuery Improvement Jan 16, 2017 Mar 25, 2017 Jesus Camacho Rodriguez Jesus Camacho Rodriguez Major Open Unresolved
HIVE-15785 Add S3 support for druid storage handler Sub-task Feb 01, 2017 Feb 01, 2017 Unassigned slim bouguerra Major Open Unresolved
HIVE-16025 Where IN clause throws exception Bug Feb 23, 2017 Mar 06, 2017 Unassigned slim bouguerra Critical Open Unresolved
HIVE-16026 Generated query will timeout and/or kill the druid cluster. Bug Feb 23, 2017 Feb 23, 2017 Unassigned slim bouguerra Major Open Unresolved
Showing 20 out of 23 issues Refresh

  • No labels