Date: Tue, 19 Mar 2024 05:03:09 +0000 (UTC) Message-ID: <1105466450.54331.1710824589187@cwiki-he-fi.apache.org> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_54330_641501626.1710824589187" ------=_Part_54330_641501626.1710824589187 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
User-Defined Aggregation Functions (UDAFs) are an excellent way to integ= rate advanced data-processing into Hive. Hive allows two varieties of UDAFs= : simple and generic. Simple UDAFs, as the name implies, are rather simple = to write, but incur performance penalties because of the use of Java Reflection, and do not allow features su= ch as variable-length argument lists. Generic UDAFs allow all these feature= s, but are perhaps not quite as intuitive to write as Simple UDAFs.
This tutorial walks through the development of the histogram() UDAF, which computes a histogram with a fixed, user-specified number of=
bins, using a constant amount of memory and time linear in the input size.=
It demonstrates a number of features of Generic UDAFs, such as a complex r=
eturn type (an array of structures), and type checking on the input. The as=
sumption is that the reader wants to write a UDAF for eventual submission t=
o the Hive open-source project, so steps such as modifying the function reg=
istry in Hive and writing
.q
tests are also included. If you j=
ust want to write a UDAF, debug and deploy locally, see this page.
NOTE: In this tutorial, we walk through the creation of=
a histogram()
function. Starting with the 0.6.0 release of Hi=
ve, this appears as the built-in function histogram_numeric()
.=
Make sure you have the latest Hive trunk by running svn up
=
in your Hive directory. More detailed instructions on downloading and setti=
ng up Hive can be found at Getting Started =
. Your local copy of Hive should work by running build/dist/bin/hive<=
/code> from the Hive root directory, and you should have some tables of dat=
a loaded into your local instance for testing whatever UDAF you have in min=
d. For this example, assume that a table called
normal
exists =
with a single double
column called val
, containin=
g a large number of random number drawn from the standard normal distributi=
on.
The files we will be editing or creating are as follows, relative to the= Hive root:
|
the main source file, to be created by you.= p> |
|
|
the function registry source file, to be edit=
ed by you to register our new |
|
|
a file of sample queries for testing |
|
|
the expected output from your sample queries,=
to be created by |
|
|
the expected output from the SHOW FUNCTIONS H=
ive query. Since we're adding a new |
|
This section gives a high-level outline of how to implement your own gen=
eric UDAF. For a concrete example, look at any of the existing UDAF sources=
present in ql/src/java/org/apache/hadoop/hive/ql/udf/generic/
=
directory.
At a high-level, there are two parts to implementing a Generic UDAF. The= first is to write a resolver class, and the second is to create a= n evaluator class. The resolver handles type checking and operator= overloading (if you want it), and helps Hive find the correct evaluator cl= ass for a given set of argument types. The evaluator class then actually im= plements the UDAF logic. Generally, the top-level UDAF class extends the ab= stract base class org.apache.hadoop.hive.ql.udf.GenericUDAFResolver= 2, and the evaluator class(es) are written as static inner classes= .
The resolver handles type checking and operator overloading for UDAF que= ries. The type checking ensures that the user isn't passing a doubl= e expression where an integer is expected, for ex= ample, and the operator overloading allows you to have different UDAF logic= for different types of arguments.
The resolver class must extend org.apache.hadoop.hive.ql.udf.Gen= ericUDAFResolver2 (see #Resolver Interface Evolution for backwards compatibi= lity information). We recommend that you extend the AbstractGenericUDAFReso= lver base class in order to insulate your UDAF from future interface change= s in Hive.
Look at one of the existing UDAFs for the *import*s you will need.
#!Java public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolve= r { static final Log LOG =3D LogFactory.getLog(GenericUDAFHistogramNumeric.cl= ass.getName()); @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) t= hrows SemanticException { // Type-checking goes here! return new GenericUDAFHistogramNumericEvaluator(); } public static class GenericUDAFHistogramNumericEvaluator extends GenericU= DAFEvaluator { // UDAF logic goes here! } }
The code above shows the basic skeleton of a UDAF. The first line sets u= p a Log object that you can use to write warnings and errors to be fed into= the Hive log. The GenericUDAFResolver class has a single overridden method= : getEvaluator, which receives information about how the U= DAF is being invoked. Of most interest is info.getParameters(), which provi= des an array of type information objects corresponding to the SQL types of = the invocation parameters. For the histogram UDAF, we want two parameters: = the numeric column over which to compute the histogram, and the number of h= istogram bins requested. The very first thing to do is to check that we hav= e exactly two parameters (lines 3-6 below). Then, we check that the first p= arameter has a primitive type, and not an array or map, for example (lines = 9-13). However, not only do we want it to be a primitive type column, but w= e also want it to be numeric, which means that we need to throw an exceptio= n if a STRING type is given (lines 14-28). BOOLEAN is excluded because the = "histogram" estimation problem can be solved with a simple COUNT() query. L= ines 30-41 illustrate similar type checking for the second parameter to the= histogram() UDAF =E2=80=93 the number of histogram bins. In this case, we = insist that the number of histogram bins is an integer.
#!Java public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) t= hrows SemanticException { TypeInfo [] parameters =3D info.getParameters(); if (parameters.length !=3D 2) { throw new UDFArgumentTypeException(parameters.length - 1, "Please specify exactly two arguments."); } =20 // validate the first parameter, which is the expression to compute ove= r if (parameters[0].getCategory() !=3D ObjectInspector.Category.PRIMITIVE= ) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { case BYTE: case SHORT: case INT: case LONG: case FLOAT: case DOUBLE: break; case STRING: case BOOLEAN: default: throw new UDFArgumentTypeException(0, "Only numeric type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } // validate the second parameter, which is the number of histogram bins if (parameters[1].getCategory() !=3D ObjectInspector.Category.PRIMITIVE= ) { throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but " + parameters[1].getTypeName() + " was passed as parameter 2."); } if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() !=3D PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentTypeException(1, "Only an integer argument is accepted as parameter 2, but " + parameters[1].getTypeName() + " was passed instead."); } return new GenericUDAFHistogramNumericEvaluator(); }
A form of operator overloading could be implemented here. Say you had tw= o completely different histogram construction algorithms =E2=80=93 one desi= gned for working with integers only, and the other for double data types. Y= ou would then create two separate Evaluator inner classes, and depending on= the type of the input expression, would return the correct one as the retu= rn value of the Resolver class.
CAVEAT: The histogram function is supposed to be used in a mann=
er resembling the following: SELECT histogram_numeric(age, 30) FROM e=
mployees;
, which means estimate the distribution of employee ages us=
ing 30 histogram bins. However, within the resolver class, there is no way =
of telling if the second argument is a constant or an integer column with a=
whole bunch of different values. Thus, a pathologically twisted user could=
write something like: SELECT histogram_numeric(age, age) FROM employ=
ees;
, assuming that age is an integer column. Of course, this makes =
no sense whatsoever, but it would validate correctly in the Resolver type-c=
hecking code above.
We'll deal with this problem in the Evaluator.
All evaluators must extend from the abstract base class org.apache.hadoo= p.hive.ql.udf.generic.GenericUDAFEvaluator. This class provides a few abstr= act methods that must be implemented by the extending class. These methods = establish the processing semantics followed by the UDAF. The following is t= he skeleton of an Evaluator class.
#!Java public static class GenericUDAFHistogramNumericEvaluator extends GenericU= DAFEvaluator { // For PARTIAL1 and COMPLETE: ObjectInspectors for original data private PrimitiveObjectInspector inputOI; private PrimitiveObjectInspector nbinsOI; // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (l= ist of doubles) private StandardListObjectInspector loi; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throw= s HiveException { super.init(m, parameters); // return type goes here } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveExcept= ion { // return value goes here } @Override public Object terminate(AggregationBuffer agg) throws HiveException { // final return value goes here } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveExc= eption { } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws = HiveException { } // Aggregation buffer definition and manipulation methods=20 static class StdAgg implements AggregationBuffer { }; @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException= { } @Override public void reset(AggregationBuffer agg) throws HiveException { } =20 }=20
What do all these functions do? The following is a brief summary of each= function, in (roughly) chronological order of being called. It's v= ery important to remember that the computation of your aggregation= must be arbitrarily divisible over the data. Think of it like writing a di= vide-and-conquer algorithm where the partitioning of the data is completely= out of your control and handled by Hive. More formally, given any subset o= f the input rows, you should be able to compute a partial result, and also = be able to merge any pair of partial results into another partial result. T= his naturally makes it difficult to port over many existing algorithms, but= should guarantee researchers jobs for quite some time.
Function |
Purpose |
init |
Called by Hive to initialize an instance of y= our UDAF evaluator class. |
getNewAggregationBuffer |
Return an object that will be used to store t= emporary aggregation results. |
iterate |
Process a new row of data into the aggregatio= n buffer |
terminatePartial |
Return the contents of the current aggregatio= n in a persistable way. Here persistable means the return value can only be= built up in terms of Java primitives, arrays, primitive wrappers (e.g. Dou= ble), Hadoop Writables, Lists, and Maps. Do NOT use your own classes (even = if they implement java.io.Serializable), otherwise you may get strange erro= rs or (probably worse) wrong results. |
merge |
Merge a partial aggregation returned by |
terminate |
Return the final result of the aggregation to= Hive |
For writing the histogram()
function, the following is the =
strategy that was adopted.
The aggregation buffer for a histogram is a list of (x,y) pairs that rep= resent the histogram's bin centers and heights. In addition, the aggregatio= n buffer also stores two integers with the maximum number of bins (a user-s= pecified parameter), and the current number of bins used. The aggregation b= uffer is initialized to a 'not ready' state with the number of bins set to = 0. This is because Hive makes no distinction between a constant parameter s= upplied to a UDAF and a column from a table; thus, we have no way of knowin= g how many bins the user wants in their histogram until the first call to <= code>iterate().
The first thing we do in iterate()
is to check whether the =
histogram object in our aggregation buffer is initialized. If it is not, we=
parse our the second argument to iterate()
, which is the numb=
er of histogram bins requested by the user. We do this exactly once and ini=
tialize the histogram object. Note that error checking is performed here =
=E2=80=93 if the user supplied a negative number or zero for the number of =
histogram bins, a HiveException
is thrown at this point and co=
mputation terminates.
Next, we parse out the actual input data item (a number) and add it to o=
ur histogram estimation in the aggregation buffer. See the GenericUDA=
FHistogramNumeric.java
file for details on the heuristic used to con=
struct a histogram.
The current histogram approximation is serialized as a list of Dou=
bleWritable
objects. The first two doubles in the list indicate the =
maximum number of histogram bins specified by the user and number of bins c=
urrent used. The remaining entries are (x,y) pairs from the current histogr=
am approximation.
At this point, we have a (possibly uninitialized) histogram estimation, = and have been requested to merge it with another estimation performed on a = separate subset of the rows. If N is the number of histogr= am bins specified by the user, the current heuristic first builds a histogr= am with all 2N bins from both estimations, and then iterat= ively merges the closest pair of bins until only N bins re= main.
The final return type from the histogram()
function is an a=
rray of (x,y) pairs representing histogram bin centers and heights. These c=
an be {{explode()}}ed into a separate table, or parsed using a script and p=
assed to Gnuplot (for example) to visualize the histogram.
Once the code for the UDAF has been written and the source file placed i=
n ql/src/java/org/apache/hadoop/hive/ql/udf/generic
, it's time=
to modify the function registry and incorporate the new function into Hive=
's list of functions. This simply involves editing ql/src/java/org/ap=
ache/hadoop/hive/ql/exec/FunctionRegistry.java
to import your UDAF c=
lass and register it's name.
Please note that you will have to run the following command to update th=
e output of the show functions
Hive call:
ant test -Dtestcase=3DTestCliDriver -Dqfile=3Dshow_functions.q -Do=
verwrite=3Dtrue
ant package build/dist/bin/hive
System-level tests consist of writing some sample queries that operate o=
n sample data, generating the expected output from the queries, and making =
sure that things don't break in the future in terms of expected output. Not=
e that the expected output is passed through diff
with the act=
ual output from Hive, so nondeterministic algorithms will have to compute s=
ome sort of statistic and then only keep the most significant digits (for e=
xample).
These are the simple steps needed for creating test cases for your new U= DAF/UDF:
ql/src/test/queries/clientpositive/udaf_XXXXX.q<=
/code> where XXXXX
is your UDAF's name.
2. Put some queries in the .q
file =E2=80=93 hopefully enough=
to cover the full range of functionality and special cases.
3. For sample data, put your own in hive/data/files
and load =
it using LOAD DATA LOCAL INPATH...
, or reuse one of the files =
already there (grep for LOAD in the queries directory to see table names).<=
br>
4. touch ql/src/test/results/clientpositive/udaf_XXXX.q.out
5. Run the following command to generate the output into the .q.out<=
/code> result file.=20
ant test -Dte=
stcase=3DTestCliDriver -Dqfile=3Dudaf_XXXXX.q -Doverwrite=3Dtrue
6. Run the following command to make sure your test runs fine.=20
ant test -Dte=
stcase=3DTestCliDriver -Dqfile=3Dudaf_XXXXX.q
Chec=
klist for open source submission
Query Processo=
r
component. Solicit discussion, incorporate feedback.ant package
from the Hive root to compile Hive and you=
r new UDAF..q
tests and their corresponding .q.out
output.ant checkstyle
and examine build/checkstyle/chec=
kstyle-errors.html
, ensure that your source files conform to the Sun=
Java coding convention (with the 100 character line length exception).ant test
, ensure that tests pass.svn up
, ensure no conflicts with the main repository.<=
/li>
svn add
for whatever new files you have created..q
and .q.out
test=
s..q
tests for all new function=
ality.show_functions.q.out
has=
been updated. Run ant test -Dtestcase=3DTestCliDriver -Dqfile=3Dshow=
_functions.q -Doverwrite=3Dtrue
to do this.svn diff > HIVE-NNNN.1.patch
from the Hive root dir=
ectory, where NNNN is the issue number the JIRA has assigned to you.terminatePartial()=
code> call, if your UDAF only uses a few variables to represent the buffer =
(such as average), consider serializing them into a list of doubles, for ex=
ample, instead of complicated named structures.
histogram_numeric()
and percentile_approx(), which both use the same core histogram estimation functionality.
diff
on the expected and actu=
al output, and fail if there is any difference at all. An example of where =
this can fail horribly is a UDAF like ngrams()
, where the outp=
ut is a list of sorted (word,count) pairs. In some cases, different sort im=
plementations might place words with the same count at different positions =
in the output. Even though the output is correct, the test will fail. In th=
ese cases, it's better to output (for example) only the counts, or some app=
ropriate statistic on the counts, like the sum. Old interface org.apache.hadoop.hive.ql.udf.GenericUDAFResolver was depr= ecated as of the 0.6.0 release. The key difference between GenericUDAFResol= ver and GenericUDAFResolver2 interface is the fact that the latter allows t= he evaluator implementation to access extra information regarding the funct= ion invocation such as the presence of DISTINCT qualifier or the invocation= with the wildcard syntax such as FUNCTION(*). UDAFs that implement the dep= recated GenericUDAFResolver interface will not be able to tell the differen= ce between an invocation such as FUNCTION() or FUNCTION(*) since the inform= ation regarding specification of the wildcard is not available. Similarly, = these implementations will also not be able to tell the difference between = FUNCTION(EXPR) vs FUNCTION(DISTINCT EXPR) since the information regarding t= he presence of the DISTINCT qualifier is also not available.
Note that while resolvers which implement the GenericUDAFResolver2 inter= face are provided the extra information regarding the presence of DISTINCT = qualifier of invocation with the wildcard syntax, they can choose to ignore= it completely if it is of no significance to them. The underlying data fil= tering to compute DISTINCT values is actually done by Hive's core query pro= cessor and not by the evaluator or resolver; the information is provided to= the resolver only for validation purposes. The AbstractGenericUDAFResolver= base class offers an easy way to transition previously written UDAF implem= entations to migrate to the new resolver interface without having to re-wri= te the implementation since the change from implementing GenericUDAFResolve= r interface to extending AbstractGenericUDAFResolver class is fairly minima= l. (There may be issues with implementations that are part of an inheritanc= e hierarchy since it may not be easy to change the base class.)