Mohit Sabharwal and Xuefu Zhang, 06/30/2015
Objective
The initial patch of Pig on Spark feature was delivered by Sigmoid Analytics in September 2014. Since then, there has been effort by a small team comprising of developers from Intel, Sigmoid Analytics and Cloudera towards feature completeness. This document gives a broad overview of the project. It describes the current design, identifies remaining feature gaps and finally, defines project milestones.
Introduction
Pig on Spark project proposes to add Spark as an execution engine option for Pig, similar to current options of MapReduce and Tez.
Pig Latin commands can be easily translated to Spark transformations and actions. Each
command carries out a single data transformation such as filtering, grouping or aggregation. This characteristic translates well to Spark, where the data flow model enables step-by-step transformations of Resilient Distributed Datasets (RDDs).
Spark will be simply “plugged in” as a new execution engine. Any optimizations or features added to Pig (like new UDFs, logical plan or physical plan optimization) will be automatically available to the Spark engine.
For more information on Pig and Spark projects, see References.
Motivation
The main motivation for enabling Pig to run on Spark is to:
Increase Pig adoption amongst users who would like to standardize on one (Spark) execution backend for operational convenience.
Improve performance:
For Pig query plans that result in multiple MapReduce jobs, such jobs can be combined into a single Spark job such that each intermediate shuffle output (“working dataset”) is stored on local disk, and not replicated across the network on HDFS only to be read back again.
Spark re-uses YARN containers, so does not need to launch new AppMaster and Task JVMs for each job.
Spark allows explicit in-memory caching of RDD dataset, which supports multi-query implementation in Pig.
Spark features like broadcast variables support implementation of specialized joins in Pig like fragment-replicate join.
Functionality
Pig on Spark users can expect all existing Pig functionality.
Users may switch to the Spark execution engine by:
Setting the SPARK_MASTER environment variable to point to user’s spark cluster, and
specifying the -x spark argument in pig command line.
Note: At this stage of development, testing has only been done in Spark “local” mode (i.e. with SPARK_MASTER as “local”). Additional code changes and environment settings may be required to configure Pig with a Spark cluster.
Spark engine will support:
EXPLAIN command that displays the Spark execution engine operator plan.
Progress, statistics and completion status for commands as well as error and debug logs.
Design
The design approach is to implement Pig Latin semantics using Spark primitives.
Since a Pig Latin command approximates a Spark RDD transformation, expressing Pig semantics directly as Spark primitives is a natural option. Moreover, like Pig, Spark supports lazy execution which is triggered only when certain commands (actions in Spark) are invoked.
This design was part of the initial patch, and is inline with that of Pig on Tez.
Note that this approach is different from one adopted by Hive on Spark project, which implements Hive QL semantics as MapReduce primitives which, in turn, are translated to Spark primitives.
Design Components
Pig Input Data Files as Spark RDDs
The first step in a Pig Latin program is to specify what the input data is, and how it’s contents are to be deserialized, i.e., converted from “input format” into Pig’s data model which views input as a sequence of Tuples (aka Bag). This step is carried out by Pig’s LOAD command which returns a handle to the bag. This bag is then processed by the next Pig command, and so on.
For Spark engine, an input Pig bag is simply a RDD of Tuples, and each subsequent Pig command can be translated to one or more RDD transformations.
InputFormat and OutputFormat
PigInputFormat abstracts out the underlying input format for the execution engine such that it always returns Pig Tuples.It is a wrapper around Pig LoadFunc which, in turn, is a wrapper around underlying Hadoop InputFormat.
All input and output formats supported with Pig should work with Spark engine. No changes are expected related to input or output formats.
Logical Plan
A Pig Latin program is translated in a one-to-one fashion to a query plan called LogicalPlan containing LogicalRelationalOperators. Pig builds a logical plan for every independent bag defined in the program.
No changes are expected to the logical plan.
Physical Plan
The LogicalPlan is converted to PhysicalPlan containing PhysicalOperators.
Note that some operators in LogicalPlan and PhysicalPlan may contain optimization information that is available to be used by the execution engine (Spark). One such scenario is when ORDER BY is following by a LIMIT operator. This is discussed in optimizations section later in the document.
Spark Plan
Spark Plan Compilation
The SparkCompiler identifies Spark jobs that need to be run for a given physical plan. It groups physical operators into one or more SparkOperators such that each SparkOperator represents a distinct Spark job. A SparkPlan is simply a DAG of SparkOperators.
The physical plan inside each SparkOperator forms the operator pipeline that gets executed by the execution engine.
The purpose of creating a SparkPlan is two fold:
It identifies all Spark jobs that need to be run.
It allows for Spark specific optimizations to be performed to the plan before execution.
Design for SparkPlan needs improvement. In the current implementation, we convert the SparkPlan into a pipeline of RDD transformations and immediately execute the RDD pipeline (by performing a Spark action). There is no intermediate step that allows optimization of the RDD pipeline, if so deemed necessary, before execution. This task will entail re-working of the current sparkPlanToRDD()code, for example by introducing an RDDPlan of RDDOperators.
Spark Plan Execution
Executing a SparkPlan entails converting underlying PhysicalOperators to RDDs and then triggering the execution via a Spark action.
Execution begins by converting the POLoad operator into an RDD<Tuple> using Spark’s API that accepts Hadoop input format (PigInputFormat). Next, we move down SparkPlan’s operator pipeline and perform a series of RDD transformations, resulting in a new RDD at each step. Step-by-step conversion of physical operators to RDDs is show in the example below:
For a given physical operator, an RDD transformation generally involves taking input tuples one by by from the predecessor RDD, “attaching” it to the underlying physical plan and calling getNextTuple() on the leaf operator of the physical plan to do the actual processing. (Pig uses the “pull” model for execution of physical operator pipeline.)
Physical Operator to Spark RDD Conversion
Every Pig Latin command translates to one or more physical operators. Converting every physical operator to a Spark RDD is an important milestone in feature completion.
Physical Operator | Converter Description | Spark APIs | Status |
POLoad | Creates an RDD for given HDFS file read with PigInputFormat. FileSpec info is passed to input format via config. Returns RDD of Pig Tuples, i.e. RDD<Tuple> | sc.newAPIHadoopFile(), rdd.map() | ✔ |
PODistinct | Shuffles using reduceByKey() Note that Spark has rdd.distinct() API as well - needs investigation whether using distinct() is more optimal. | rdd.reduceByKey() | ✔ |
POForEach | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). When GROUP BY is followed by FOREACH with algebraic UDFs or nested DISTINCT, there is opportunity to use a Combiner function. This optimization remains to be done for Spark engine. | rdd.mapPartitions(...) | ✔ |
POFilter | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple() . | rdd.filter(...) | ✔ |
POCross | N/A since POCross is only used when CROSS is used inside nested ForEach. Note that for (non-nested) CROSS, Pig parallelizes the operation by generating a randomized synthetic key (in GFCross UDF) for every record, replicating the records, performing a shuffle based on the synthetic key and then joining records in each reducer. Spark engine simply re-uses the physical plan without any changes. |
| N/A |
POLimit | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). | rdd.coalesce(), rdd.mapPartitions() | ✔ |
POSort | Sorts using JavaPairRDD.sortBykey() using POSort.SortComparator | rdd.map() rdd.sortByKey(), rdd.mapPartitions() | ✔ |
POSplit | Used either explicitly or implicitly in case of multiple stores ( “multi-query execution” feature) | ⨯ | |
POStore | Persists Pig Tuples (i.e. RDD<Tuple>) using PigOutputFormat to HDFS. | PairRDDFunctions.saveAsNewAPIHadoopFile() | ✔ |
POUnion | Returns union of all predecessor RDDs as a new UnionRDD. | new UnionRDD() | ✔ |
POStream | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple() | rdd.mapPartitions() | ✔ |
POSkewedJoin | Optimizes case where there is significant skew in the number of records per key. Currently implemented as a regular RDD join. | JavaPairRDD.join() | ✔ |
POFRJoin | No shuffle join when one input fits in memory. Currently implemented as a regular RDD join. | JavaPairRDD.join() JavaPairRDD.leftOuterJoin() | ✔ |
POMergeJoin | A no-shuffle join if both inputs are already sorted. Currently implemented as a regular RDD join. | JavaPairRDD.join() | ✔ |
POLocalRearrange | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). Generates tuples of the form (index, key,(Tuple without key)). | rdd.map() | ✔ |
POGlobalRearrange | Creates a new CoGroupRDD of predecessor RDDs. Generates tuples of the form (bag index, key, {tuple without key}). The output is always processed next by the POPackage operator. Note that PIG represents ordinary shuffle operations like GROUP BY as three physical operators: LocalRearrange (to identify the key and source), GlobalRearrange (to do the actual shuffle) and Package (to generate the output in each reducer). We use a Spark API to do the shuffle (CoGroupRDD). We just need to identify the key, not the sources. So, the Packaging step can be combined with GlobalRearrange step for Spark. This optimization remains to be done for Spark engine. | new CoGroupRDD() rdd.map() | ✔ |
POPackage | Packages globally rearranged tuples into format required by co-group. Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). | rdd.map() | ✔ |
PONative | Native MR. Follow up with Native Spark. | ⨯ | |
POCollectedGroup | Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). | rdd.mapPartitions() | ✔ |
POMergeGroup | ⨯ | ||
POCounter | This operator supports the RANK command and appears right before the PORank operator in the plan. The output is an RDD of tuples of the form (partition index, counter, tuple) where the counter is incremented for every record (there is special handling for “DENSE” rank). | rdd.mapPartitionsWithIndex() | ✔ |
PORank | The operator appears right after the POCounter operator. Runs two Spark jobs. First to compute number of records per partition index. And a second Spark job to compute the rank of each tuple by adding offset to counter values in tuples based on output of the first job. | rdd.mapToPair().groupByKey().sortByKey().collectAsMap() rdd.map() | ✔ |
✔ = Implemented, ✔ = Needs optimal implementation, ⨯ = Not Implemented
Special Considerations
Multi-Query Execution
Multi-query execution in Pig is motivated by the fact that users often process the same data set in multiple ways, but do not want to pay the cost of reading it multiple times. To address this case, Pig inserts a SPLIT operator for every logical operator that has multiple outputs, which essentially means “materialize state at this point”. For Spark engine, a SPLIT can be translated to an optimization step where the RDD data set is pulled into Spark’s cluster-wide in-memory cache, such that child operators read from the cache. (In the MapReduce engine, child operators read from disk.)
Without this optimization, Spark engine Pig jobs will run significantly slower in the multi-query case because RDDs will need to be recomputed. This optimization needs to be implemented.
Remaining Optimizations
Specialized Joins and Groups
Pig supports specialized joins like fragment replicate join, merge join and skew join, as well as specialized grouping like collected groups and merge groups. These are explicitly specified by the user with the USING clause in the Pig command. (Pig does not automatically choose a specific join or group based on input data set.)
These are currently implemented as regular joins and groups. Specialized versions need to be implemented.
Secondary Sort
In Pig with MapReduce engine, there are several map-side performance optimizations.
A good example is secondary key sort:
B = GROUP A by FOO;
C = FOREACH B {
D = ORDER A by BAR;
GENERATE D;
}
MapReduce provides specialized API to support secondary key sort within groups. Spark currently does not have support for secondary sort (SPARK-3655).
Currently, secondary sort in Spark engine is implemented using two shuffles. This needs to be fixed.
Combiner Optimizations
Using a combiner lowers shuffle volume and skew on the reduce side.
The Pig combiner is an optimization that applies to certain FOREACH cases:
In nested foreach when the only nested operation is DISTINCT (i.e. dedupes in map phase to reduce shuffle volume).
In non-nested foreach, where all projections are either:
Expressions on the GROUP column, Or
UDFs implementing the Algebraic interface.
The combiner either translates to MR combiner or a special Pig operator which does in-memory combining in the map stage (“partial aggregation” feature).
Combiner support is currently not implemented for Spark engine.
Limit after Sort
In MapReduce engine, a sort entails three map reduce jobs - first one for computing quantiles from samples of input data, second one for performing the shuffle partitioned based on quantile ranges, and third one which is a 1-reduce-task shuffle to generate the final output.
In the scenario where ORDER BY is followed by LIMIT n, logical and physical plans do not have the POLimit operator. Instead, the sort operator (POSort) contains the limit information (see LimitOptimizer and LimitAdjuster). MapReduce uses the limit information to optimize the cost of sorting in the second MR job where the combiner and the reducer stages output just the top n items.
Currently, Spark sort API does not take limit information. Hence no limit related optimization is implemented for the Spark engine. See PIG-4438.
Use Optimal Spark API for shuffling
Currently shuffle is implemented using Spark’s groupBy, sortByKey and CoGroupRDD APIs. However, Spark has since added other variants like aggregateByKey (which also support combiner functions).
Parallelism during shuffle
Currently no parallelism estimate is made when calling Spark’s shuffle APIs, leaving Spark to set it.
Native operator for Spark
For several reasons (performance, difficult translation to Pig, legacy code, etc.), user may want to directly run Spark code written in Scala, Python or Java from a Pig script.
This entails breaking the Pig pipeline, writing data to disk (added POStore), invoking the native Spark script, and then reading data back from disk (added POLoad). Some issues:
Co-ordination between Pig’s spark jobs and native spark jobs.
Adding stats and progress for native Spark jobs.
Handling any security implications when running Spark code natively.
This is a low priority item for the first version.
Packaging as part of GlobalRearrange
As described earlier, the Packaging operator does not necessarily need it’s own RDD transformation in Spark and may be made part of the GlobalRearrange RDD transformation.
This is an optimization step which can save a few extra transformations. Though it might make it more confusing to diverge the behavior from MR and Tez.
Progress Reporting and Statistics
Basic support for Spark job progress reporting, statistics and logs has been implemented. Needs more work for comprehensive support.
Test Infrastructure
Unit Tests
Status of latest unit test run is here.
Unit tests with Spark engine use the standard miniDFS cluster. However, currently unit tests run in Spark “local” mode. Spark offers a way to run jobs in “local cluster” mode, where a cluster is made up of a given number of processes on the local machine. Unit test execution needs to be switched to local-cluster mode once local mode tests pass.
More Spark specific unit tests need to be added.
No testing has been done so far with actual Spark cluster.
Not much thought has been given so far on benchmark and performance testing.
Summary of Remaining Tasks
Design
Current design for SparkPlan needs improvement as mentioned earlier.
Functionality
All physical operators are supported at this point, except PONative.
Unit test failures point to some important gaps in existing implementation. These are highlighted as items that need to implemented as part of Milestone 1 below.
Optimizations
Specialized versions of joins and cogroups.
Running combiner for Algebraic UDFs and Foreach optimizaton.
Computing optimal parallelism for shuffles.
Spark now has several similar shuffle APIs. Need to choose the optimal ones.
More efficient implementation of secondary sort.
Spark integration
Progress and error reporting support is implemented but needs improvement.
Tests
Test with Spark local-cluster mode.
Add remaining unit tests for Spark engine specific code.
Test on Spark cluster.
Benchmarking and performance tests.
Comparison with Pig on Tez
Tez, as a backend execution engine, is very similar to Spark in that it offers the same optimizations that Spark does (speeds up scenarios that require multiple shuffles by storing intermediate output in local disk or memory, re-use of YARN containers and support for distributed in-memory caching.). The main implementation difference when using Tez as a backend engine is that Tez offers a much lower level API for expressing computation. From the direct user perspective, Tez also does not offer a built-in shell.
Pig on Tez design is very similar to current Pig on Spark design, in that it constructs a new plan directly from the PhysicalPlan.
In Pig on Tez, every shuffle boundary translates into two Tez vertices and the connecting edge expresses the fact we are shuffling.
In Pig on Spark, the API is not as low level, so every shuffle is expressed as a high level call to Spark like reduceBy or CoGroupRDD.
Significant refactoring of code was done in Pig 0.13 to support backends other than MapReduce starting with Pig on Tez. Pig on Spark builds on that effort.
Milestones
This document lists milestones for ongoing work on Pig on Spark.
Milestone 1
Goal: Functional completeness of major items
ETA: ~ 10-12 developer weeks
Missing Operators
Operator | Comment | Owner | ETA | Status |
POCross (Top level & Nested Cross) | PIG-4549 (top level) and PIG-4552 (nested) | Mohit | 3 days | ✔ |
POFRJoin | PIG-4278 | Mohit | 1 day | ✔ |
POMergeJoin | PIG-4422 | Mohit | 1 day | ✔ |
PONative | Low priority item for first version. Need a (no-op) implementation. | 1 day | ||
POMergeGroup | 1 day | |||
POSkewJoin | PIG-4421 | Kelly | ✔ |
Fix or disable tests for specialized JOIN and GROUP/COGROUP operators.
Missing Features
Feature | Comment | Test | Owner | ETA | Status |
Support for custom Partitioner | PIG-4565 Used in DISTINCT, GROUP, JOIN, CROSS. Need to wrap user’s custom MR Partitioner in Spark Partitioner object. | TestCustomPartitioner | Mohit | 1 week | ✔ |
Combiner support for Algebraic UDFs | TestCombiner | TBD | |||
Spark should call cleanup in MR OutputCommitter API | Low priority clean-up item. Not a must-do for first version. | TestStore | 3 days | Blocked by SPARK-7953 and multi-query | |
Support for HBase storage | PIG-4585, PIG-4611 | TestHBaseStorage | Mohit/Kelly | 4 days | |
Support for secondary sort | PIG-4504 | TestAccumulator#testAccumWithSort | Kelly | 2 days | ✔ |
Use optimal shuffle API | Currently, we use groupByKey, which assumes all values for a key will fit in memory. Use aggregateByKey or reduceByKey instead. | 2 days | |||
Secondary Sort using one shuffle | PIG-4504 implements secondary sort using 2 shuffles. We should do it in one (PIG-4553). This is a performance item, but a well used feature, so we should do it in first milestone. | 4 days | |||
Multi-query support | TestMultiQuery, TestMultiQueryLocal broken | Kelly | 1 week |
Critical Tests
Corner cases failing in already implemented features
Test | Comment | Owner | ETA | Status |
TestJoin | Joining empty tuples fails | Kelly | 1 day | ✔ |
TestPigContext, TestGrunt | PIG-4295. Deserializing UDF classpath config fails in Spark because it’s thread local. | Kelly | 3 days | ✔ |
TestProjectRange | PIG-4297 Range expressions fail with groupby | Kelly | 2 days | ✔ |
TestAssert | FilterConverter issue? | 1 day | ||
TestLocationInPhysicalPlan | 1 day |
Other Tests
ETA: 1 week
Several tests are failing due to either:
Ordering difference in shuffle results (MR returns sorted results, Spark doesn’t), Or
Gaps in SparkPigStats.
We should fix these tests as these as we find time as these are “low hanging fruit” and might help us uncover other issues. These include TestScriptLanguage, TestPigRunner, TestJoinLocal, TestEvalPipelineLocal, TestDefaultDateTimeZone, etc.
Investigate
And fix if needed
Feature | Comment | Owner | ETA | Status |
Streaming | Test are passing, but we need confirmation that this feature works. | 1 day |
Spark Unit Tests
Few Spark engine specific unit tests have been written so far (for features that have Spark specific implementations). Following is a partial list of what we need to add. Need to update this list as we add more Spark specific code. We should also add tests for POConverter implementations.
Test | Comment | Owner | ETA | Status |
TestSparkLauncher | ||||
TestSparkPigStats | ||||
TestSecondarySortSpark | Kelly | ✔ |
Enhance Test Infrastructure
ETA: ~ 2 weeks (additional test failures expected)
Use “local-cluster” mode to run unit tests and fix resulting failures.
Milestone 2
Goal: Spark Integration & remaining functionality items
ETA: ~ 5 developer weeks
Spark Integration
ETA: 2 weeks
Including error reporting, improved progress and stats reporting
Fix Remaining Tests
ETA: 3 weeks
TestScriptLanguageJavaScript
TestPigRunner
TestPruneColumn: Fixed in PIG-4582
TestForEachNestedPlanLocal:Fixed in PIG-4552
TestRank1
TestPoissonSampleLoader
TestPigServerLocal
TestNullConstant: Fixed in PIG-4597
TestCase: Fixed PIG-4589
TestOrcStoragePushdown
Milestone 3
Goal: Performance optimization and code cleanup
ETA: TBD
Performance Tests
TBD
Performance Optimizations
Feature | Comment | Owner | ETA | Status |
Split / MultiQuery using RDD.cache() | ||||
In Group + Foreach aggregations, use aggregateByKey or reduceByKey for much better performance | For example: COUNT or DISTINCT aggregation inside nested foreach is handled by Pig code. We should use Spark to do in more efficiently | |||
Compute optimal Shuffle Parallelism | Currently we let Spark pick the default | |||
Combiner support for “Group+ForEach” | ||||
Multiple GROUP BYs on the same data set can avoid multiple shuffles. | See MultiQueryPackager | |||
Switch to Kryo for Spark data serialization | Are all Pig serializable classes compatible with Kryo ? | |||
FR Join | ||||
Merge Join (including sparse merge join) | ||||
Skew Join | ||||
Merge CoGroup | ||||
Collected CoGroup |
Note that there is ongoing work in Spark SQL to support specialized joins: See SPARK-2211. As an example, support for merge join is in Spark SQL in Spark 1.4 (SPARK-2213 and SPARK-7165). This implies that Spark community will not be adding support for these joins in Spark Core library.
Re-design Spark Plan
Currently, the SparkLauncher converts the SparkPlan to RDD pipeline and immediately executes it. There is no intermediate step that allows optimization of the RDD pipeline, if so deemed necessary, before execution. This will need re-working of sparkPlanToRDD(), perhaps by introduction of a RDDPlan of RDDOperators.
Other Features
Native Spark operator support.
Allow Spark Partitioner to be specified using PARTITION BY.
Getting Started
Github: https://github.com/apache/pig/tree/spark
Please refer to PIG-4059 for instructions on how to setup your development environment, PIG-4266 for instructions on how to run unit tests and PIG-4604 for instructions on package import order.
References
[1] Pig
[2] Pig Latin
[5] Spark
[6] Spark blog post