Date: Tue, 19 Mar 2024 11:40:58 +0000 (UTC) Message-ID: <1116723926.56607.1710848458269@cwiki-he-fi.apache.org> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_56606_1791804607.1710848458269" ------=_Part_56606_1791804607.1710848458269 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
Tez is a new application framework built on Hadoop Yarn that can ex= ecute complex directed acyclic graphs of general data processing tasks. In = many ways it can be thought of as a more flexible and powerful successor of= the map-reduce framework.
It generalizes map and reduce tasks by exposing interfaces for generic d= ata processing tasks, which consist of a triplet of interfaces: input, outp= ut and processor. These tasks are the vertices in the execution graph. Edge= s (i.e.: data connections between tasks) are first class citizens in Tez an= d together with the input/output interfaces greatly increase the flexibilit= y of how data is transferred between tasks.
Tez also greatly extends the possible ways of which individual tasks can= be linked together; In fact any arbitrary DAG can be executed directly in = Tez.
In Tez parlance a map-reduce job is basically a simple DAG consisting of= a single map and reduce vertice connected by a =E2=80=9Cbipartite=E2=80=9D= edge (i.e.: the edge connects every map task to every reduce task). Map in= put and reduce outputs are HDFS inputs and outputs respectively. The map ou= tput class locally sorts and partitions the data by a certain key, while th= e reduce input class merge-sorts its data on the same key.
Tez also provides what basically is a map-reduce compat layer that let= =E2=80=99s one run MR jobs on top of the new execution layer by implementin= g Map/Reduce concepts on the new execution framework.
More information about Tez can be found here:
Hive uses map-reduce as its execution engine. Any query will produce a g= raph of MR jobs potentially interspersed with some local/client-side work. = This leads to many inefficiencies in the planning and execution of queries.= Here are some examples that can be improved by using the more flexible Tez= primitives:
Whenever a query has multiple reduce sinks (that cannot be combined, i.e= .: no correlation between the partition keys), Hive will break the plan apa= rt and submit one MR job per sink. All of the MR jobs in this chain need to= be scheduled one-by-one and each one has to re-read the output of the prev= ious job from HDFS and shuffle it. In Tez several reduce sinks can be linke= d directly and data can be pipelined without the need of temporary HDFS fil= es. This pattern is referred to as MRR (Map - reduce - reduce*).
More than just MRR, Tez allows for sending the entire query plan at once= thus enabling the framework to allocate resources more intelligently as we= ll as pipelining data through the various stages. This is a huge improvemen= t for more complicated queries as it eliminates IO/sync barriers and schedu= ling overhead between individual stages. An example would be a query that a= ggregates two tables in subqueries in the from clause and joins the resulti= ng relations.
Currently any shuffle is performed the same way regardless of the data s= ize. Sorted partitions are written to disk, pulled by the reducers, merge-s= orted and then fed into the reducers. Tez allows for small datasets to be h= andled entirely in memory, while no such optimization is available in map-r= educe. Many warehousing queries sort or aggregate small datasets after the = heavy lifting is done. These would benefit from an in memory shuffle.
Distributed join algorithms are difficult to express in map-reduce. A re= gular shuffle join for instance has to process different inputs in the same= map task, use tags to be written to disk for distinguishing tables, use se= condary sort to get the rows from each table in a predictable order, etc. T= ez is a much more natural platform to implement these algorithms.
For example: It is possible to have one Tez task take multiple bipartite= edges as input thus exposing the input relations directly to the join impl= ementation. The case where multiple tasks feed into the same shuffle join t= ask will be referred to as multi-parent shuffle join.
All sorting in map-reduce happens using the same binary sort, regardless= of the data type. Hive might for instance choose to use a more effective i= nteger-only sort when possible. Tez makes that available.
Since Hive uses map-reduce to compute aggregations, processing will alwa= ys boil down to a sort-merge even though we=E2=80=99re not actually interes= ted in the sort order. Tez will allow for more efficient hash-based algorit= hms to do the same.
Tez allows complete control over the processing, including being able to= stop processing when limits are met (without simply skipping records or re= lying on file formats/input formats.) It=E2=80=99s also possible to define = specific edge semantics, which could be used to provide a generic top-k edg= e to simplify =E2=80=9Climit=E2=80=9D processing.
The rest of this document describes the first phase of Hive/Tez integrat= ion. The goals are:
Limiting the integration to the fairly simple MRR/MPJ pattern will requi= re minimal changes to the planner and execution framework while speeding up= a wide variety of queries. At the same time it will allow us to build a so= lid foundation for future improvements.
The following things are out of scope for the first phase:
One new configuration variable will be introduced:
Note: It is possible to execute an MR plan against TEZ. In order to do s= o, one simply has to change the following variable (assuming Tez is install= ed on the cluster):
Here=E2=80=99s a TPC-DS query and plans with and without Tez optimizatio= ns enabled:
The query (rewritten for Hive):
select i_item_desc ,i_category ,i_class ,i_current_price ,i_item_id ,itemrevenue ,itemrevenue*100/sum(itemrevenue) over (partition by i_class) as revenueratio from (select i_item_desc ,i_category ,i_class ,i_current_price ,i_item_id ,sum(ws_ext_sales_price) as itemrevenue from web_sales join item on (web_sales.ws_item_sk =3D item.i_item_sk) join date_dim on (web_sales.ws_sold_date_sk =3D date_dim.d_date_sk) where i_category in ('1', '2', '3') and year(d_date) =3D 2001 and month(d_date) =3D 10 group by i_item_id ,i_item_desc ,i_category ,i_class ,i_current_price) tmp order by i_category ,i_class ,i_item_id ,i_item_desc ,revenueratio;
Stage 0:
Local Work: Generate hash table for date dim
Stage 1:
Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side g= roup by/aggregate
Reduce 1: Reduce side group by/aggregate, shuffle for windowing
Reduce 2: Compute windowing function, shuffle for order by
Reduce 3: Order by, write to HDFS
Local Work: Generate hash table for date dim
Stage 1:
Map: SMB join item + web_sales, mapjoin date_dim + web_sales, map-side g= roup by/aggregate
Reduce: Reduce side group by/aggregate, write to HDFS
Stage 2:
Map: Read tmp file, shuffle for windowing
Reduce: Compute windowing function, write to HDFS
Stage 3:
Map: Read tmp file, shuffle for order by
Reduce: Order by, write to HDFS
Changes that impact current Hive code paths:
I believe that all of these are valuable by themselves and make the code= cleaner and easier to maintain. Especially the second item will touch quit= e a few places in the code though. None of them change functionality.
New code paths (only active when running Tez):
The following outlines the changes across the various Hive components:= p>
We=E2=80=99ve initially investigated to add Tez as a simple shim option = to the code base. This didn=E2=80=99t work out mostly because Tez=E2=80=99 = API is very different from the MR api. It does not make much sense to move = the entire =E2=80=9Cexecute=E2=80=9D infrastructure to the shim layer. That= would require large code changes with little benefit. Instead there will b= e separate =E2=80=9CTask=E2=80=9D implementations for MR and TEZ and Hive w= ill decide at runtime which implementation to use.
We=E2=80=99re planning to have two packages:
Both will contain implementations of the Task interface, which is used t= o encapsulate units of work to be scheduled and executed by the Driver clas= s.
Both of these packages will have classes for job monitoring and job diag= nostics, although they are package private and do not follow a common inter= face.
Currently ExecDriver and MapRedTask (both are of type =E2=80=9CTask=E2= =80=9D) will submit map-reduce work via JobClient (either via local-job run= ner or against the cluster). All MR specific job submission concepts are hi= dden behind these classes.
We will add a TezTask as the entry point for Tez execution. TezTask will= hide building of the job DAG and orchestrate monitoring and diagnostics of= DAG execution.
Hive=E2=80=99s driver will still deal with a graph of Tasks to handle ex= ecution. No changes are required to handle this. The only difference is tha= t now the planner might transparently add TezTasks to the mix at runtime.= p>
We will add a TezJobMonitor class that handles printing of status as wel= l as reporting the final result. This class provides similar functions than= HadoopJobExecHelper used for MR processing. TezJobMonitor will also retrie= ve and print the top level exception thrown at execution time.
Basic =E2=80=98job succeeded/failed=E2=80=99 as well as progress will be= as discussed in =E2=80=9CJob monitoring=E2=80=9D. Hive=E2=80=99s cur= rent way of trying to fetch additional information about failed jobs will n= ot be available in phase I.
Currently Tez offers limited debugging support once a job is complete. T= he only way to get access to detailed logs, counters, etc is to look at the= log of the AM, find the appropriate url for specific task logs and access = them through copy and paste. This will change over time and historical logg= ing information will be accessible, but for the first phase debugging suppo= rt will be limited.
API for retrieving counters will be different in Tez and we will thus ad= d a shim api for that. Incrementing counters at execution time will work un= changed.
The basic execution flow will remain unchanged in phase I. ExecMapper/Ex= ecReducer will be used through Tez=E2=80=99 MR compat layer. The operator p= lan + settings will be communicated via =E2=80=98scratch dir=E2=80=99 as be= fore. ExecMapper/Reducer will load and configure themselves accordingly.
MapRedWork is where we currently record all information about the MR job= during compile/optimize time.
This class will be refactored to capture Map information and reduce info= rmation separately (in MapWork and ReduceWork). This requires quite a few -= albeit straight-forward - changes to both planning and execution.
The refactor has benefits for pure MR execution as well. It removes the = need for Mappers/Reducers to load and de-serialize information they don=E2= =80=99t use and it makes it easier to read and maintain the code, because i= t clearly delineates what information is used at what stage.
MapWork and ReduceWork will be shared by both MapRedWork and TezWork. Ma= pRedWork is basically a composition of 1M + 0-1R, while TezWork is a tree o= f Map/ReduceWork with MapWork classes as leaves only.
As discussed above, TezTask will use TezWork, while MapRedTask and ExecD= river will use MapReduceWork.
Neither semantic analyzer nor any logical optimizations will change. Phy= sical optimizations and MR plan generation are currently also done in the S= emanticAnalyzer and will be moved out to separate classes.
The MapReduceCompiler does two things at the same time right now. After = breaking down the operator plan into map-reduce tasks, it optimizes the num= ber of tasks and also performs physical optimizations (picks join implement= ations, total order sort, etc).
In order to limit the impact of Tez, we will provide a separate implemen= tation: TezCompiler. The Tez compiler will attempt to perform most physical= optimizations at the plan level, leaving the breakdown of the plan into Te= z jobs for a second round of optimizations.
Later we may decide to use that physical optimizer (at the plan level) f= or both MR and Tez, while leaving specific optimizations in the two layers.=
In the short term Tez will not support a =E2=80=9CLocalTezDagRunner=E2= =80=9D. That means that Hive will always have to submit MR jobs when execut= ing locally. In order to avoid replanning the query after execution has sta= rted in Tez mode some optimizations for converting stages to local jobs wil= l not be available.
Some MR jobs have a predetermined number of reducers. This happens for o= rder by (numReducers =3D 1) and scenarios where bucketing is used (numReduc= ers =3D numBuckets). The user can also set the number of reducers manually.= The same numbers will be used for each reduce tasks. Initially there will = be no way for the user to set different numbers of reducers for each of the= separate reduce stages. There is already a ticket (HIVE-3946) to address t= his shortcoming which can be used for both Tez and MR.
In most cases Hive will determine the number of reducers by looking at t= he input size of a particular MR job. Hive will then guess the correct numb= er of reducers. The same guess will be used for subsequent reduce phases in= a Tez plan. Ultimately, this number will have to be determined using stati= stics which is out of scope, but applies equally to MR and Tez.
Explain statements are driven (in part) off of fields in the MapReduceWo= rk. Part of extending/refactoring MapReduceWork will be to add sufficient i= nformation to print the correct operator trees in explain for Tez.
The =E2=80=9Cset=E2=80=9D mechanism for Hive variables will not change. = The variables will be passed through to the execution engine as before. How= ever, Hive will not shim or map any mapreduce variables. If a variable is n= ot supported in Hive it will be silently ignored.
There will be a new =E2=80=9Cql=E2=80=9D dependency on Tez. The jars wil= l be handled the same way Hadoop jars are handled, i.e.: They will be used = during compile, but not included in the final distribution. Rather we will = depend on them being installed separately. The jars will only have to be pr= esent to run Tez jobs, they are not needed for regular MR execution.
Mini Tez Cluster will initially be the only way to run Tez during unit t= ests. LocalRunner is not yet available. If mr.rev is set to tez all MiniMr = tests will run against Tez.
For information about how to set up Tez on a Hadoop 2 cluster, see = https://github.com/apache/i= ncubator-tez/blob/branch-0.2.0/INSTALL.txt.
For information about how to configure Hive 0.13.0+ for Tez, see the rel= ease notes for HIVE-6098, Merge Tez branch into t= runk. Also see Configuration Properties: Tez f= or descriptions of all the Tez parameters.
For a list of Hive and Tez releases that are compatible with each other,= see Hive-Tez C= ompatibility.