...
Internal Interface Changes
OpFusionCodegenSpec
This interface maintains the required information an operator needs to support OFCG.
OpFusioContext
Code Block | |
---|---|
Code Block | |
language | java | title | OpFusionCodegenSpec
/** An interface for those physical operators that support * A OpFusionContext contains information about the context in which {@link OpFusionCodegenSpec} * needed to do operator fusion codegen. */ traitpublic interface OpFusionCodegenSpecOpFusionContext { /** Return the *output Iftype anof operatorcurrent has multiple outputs in the OpFusionCodegenSpec DAG, we only need to call it{@link OpFusionCodegenSpecGenerator}. */ RowType getOutputType(); /** * produce method* once,Return sothe wemanaged needmemory thesefraction twoof flag variable. */this {@link OpFusionCodegenSpecGenerator} needed during private var hasProcessProduceTraversed =* false all fusion privateoperators. var hasEndInputProduceTraversed = false*/ protected var managedMemoryFraction: Double = 0double getManagedMemoryFraction(); val inputs: ListBuffer[OpFusionCodegenSpec] = new ListBuffer[OpFusionCodegenSpec] val outputs: ListBuffer[(Int, OpFusionCodegenSpec)] = /** Return the input {@link OpFusionContext} of this {@link OpFusionCodegenSpecGenerator}. */ newList<OpFusionContext> ListBuffer[(Int, OpFusionCodegenSpec)]getInputFusionContexts(); /** Prefix used in* theGenerate currentJava operator'ssource variablecode names. */ def variablePrefix: String def getOutputType: RowType def getOperatorCtx: CodeGeneratorContext def getExprCodeGenerator: ExprCodeGenerator def getManagedMemory: Long = 0L final def setManagedMemFraction(managedMemoryFraction: Double) = this.managedMemoryFraction = managedMemoryFraction /*to process the rows from operator corresponding input, delegate to * {@link OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} method. */ void processProduce(CodeGeneratorContext codegenCtx); /** * Generate Java source code to processdo theclean rowswork fromfor operator corresponding input. */ , delegate to final* def{@link processProduceOpFusionCodegenSpecGenerator#endInputProduce(fusionCtx: CodeGeneratorContext):} Unitmethod. = { if (!hasProcessProduceTraversed) {*/ void endInputProduce(CodeGeneratorContext codegenCtx); default String doProcessProduceprocessConsume(fusionCtxList<GeneratedExpression> outputVars) { hasProcessProduceTraversed = true return processConsume(outputVars, null); } } /** * GenerateConsume the Java source code to process rows, only the leaf operator in operator DAG need to * generate the code which produce the row, other middle operators just call its input generated columns or row from current {@link OpFusionCodegenSpec}, delegate to * {@link OpFusionCodegenSpecGenerator#processConsume(List, String)} ()} method. * * `processProduce`<p>Note normally,that otherwise,`outputVars` theand operator`row` hascan't someboth specificbe logicnull. The leaf operator */ produce row first, andString then call its `processConsume` method to consume row.processConsume(List<GeneratedExpression> outputVars, String row); /** * The code* generatedGenerate byJava leafsource operatorcode willto bedo savedclean inwork fusionCtx,for so{@link thisOpFusionCodegenSpec} methodcorresponding doesn't has * return type. */ protected def doProcessProduce(fusionCtx: CodeGeneratorContext): Unit /** * Consume the generated columns or row from current operator, call its output's * `doProcessConsume()`. * * Note that `outputVars` and `row` can't both be null. */ final def processConsume(outputVars: Seq[GeneratedExpression], row: String = null): String = { val inputVars = if (outputVars != null) { assert(outputVars.length == getOutputType.getFieldCount) outputVars } else { assert(row != null, "outputVars and row can't both be null.") getExprCodeGenerator.generateInputAccessExprs() } // if this operator has multiple output operators, we need to materialize them in advance to // avoid be evaluated multiple times in downstream val exprCode = if (outputs.length > 1) { outputVars.map(expr => expr.getCode).mkString("\n") } else { "" } val consumeCode = outputs .map( op => { val inputIdOfOutputNode = op._1 val output = op._2 // we need to bind out ctx before call its consume to reuse the input expression if (inputIdOfOutputNode == 1) { output.getExprCodeGenerator.bindInputWithExpr(getOutputType, inputVars, row) } else { output.getExprCodeGenerator.bindSecondInputWithExpr(getOutputType, inputVars, row) } // we always pass column vars and row var to output op simultaneously, the output decide to use which one output.doProcessConsume(inputIdOfOutputNode, inputVars, row) }) .mkString("\n") s""" |$exprCode | // consume code |$consumeCode """.stripMargin } /** * The process method is responsible for the operator data processing logic, so each operator * needs to implement this method to generate the code to process the row. This should only be * called from `processConsume`. * * @param inputId * This is numbered starting from 1, and `1` indicates the first input. * @param inputVars * field variables of current input. * @param row * row variable of current input. * * Note: A operator can either consume the rows as RowData (row), or a list of variables * (inputVars). */ def doProcessConsume(inputId: Int, inputVars: Seq[GeneratedExpression], row: String): String /** Generate Java source code to do clean work for operator corresponding input. */ final def endInputProduce(fusionCtx: CodeGeneratorContext): Unit = { if (!hasEndInputProduceTraversed) { doEndInputProduce(fusionCtx) hasEndInputProduceTraversed = true } } /** * Generate the Java source code to do operator clean work, only the leaf operator in operator DAG * need to generate the code, other middle operators just call its input `endInputProduce` * normally, otherwise, the operator has some specific logic. * * The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has * return type. */ protected def doEndInputProduce(fusionCtx: CodeGeneratorContext): Unit /** * Generate code to trigger the clean work of operator, call its output's `doEndInputConsume()`. * The leaf operator start to call `endInputConsume`. */ final def endInputConsume(): String = { s""" |${outputs.map(op => op._2.doEndInputConsume(op._1)).mkString("\n")} """.stripMargin } /** * The endInput method is used to do clean work for operator corresponding input, such as the * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table, so * each operator needs to implement the corresponding clean logic in this method. * * For blocking operators such as HashAgg, the `processConsume` method needs to be called first to * consume the data, followed by the `endInputConsume` method to do the cleanup work of the * downstream operators. For pipeline operators such as Project, you only need to call the * `endInputConsume` method. * * @param inputId * This is numbered starting from 1, and `1` indicates the first input. */ def doEndInputConsume(inputId: Int): Stringinput, delegate to {@link OpFusionCodegenSpecGenerator#endInputConsume()} method. */ String endInputConsume(); } |
OpFusionCodegenSpec
This interface maintains the required information an operator needs to support OFCG.
Code Block | ||||
---|---|---|---|---|
| ||||
/** An interface for those physical operators that support operator fusion codegen. */
@Internal
public interface OpFusionCodegenSpec {
/**
* Initializes the operator spec. Sets access to the context. This method must be called before
* doProduce and doConsume related methods.
*/
void setup(OpFusionContext opFusionContext);
/** Prefix used in the current operator's variable names. */
String variablePrefix();
/**
* The subset of column index those should be evaluated before this operator.
*
* <p>We will use this to insert some code to access those columns that are actually used by
* current operator before calling doProcessConsume().
*/
Set<Integer> usedInputColumns(int inputId);
/**
* Specific inputId of current operator needed {@link RowData} type, this is used to notify the
* upstream operator wrap the proper {@link RowData} we needed before call doProcessConsume
* method. For example, HashJoin build side need {@link BinaryRowData}.
*/
Class<? extends RowData> getInputRowDataClass(int inputId);
/**
* Every operator need one {@link CodeGeneratorContext} to store the context needed during
* operator fusion codegen.
*/
CodeGeneratorContext getCodeGeneratorContext();
/** Get the {@link ExprCodeGenerator} used by this operator during operator fusion codegen, . */
ExprCodeGenerator getExprCodeGenerator();
/**
* Generate the Java source code to process rows, only the leaf operator in operator DAG need to
* generate the code which produce the row, other middle operators just call its input {@link
* OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} normally, otherwise, the
* operator has some specific logic. The leaf operator produce row first, and then call {@link
* OpFusionContext#processConsume(List)} method to consume row.
*
* <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
* return type.
*/
void doProcessProduce(CodeGeneratorContext codegenCtx);
/**
* The process method is responsible for the operator data processing logic, so each operator
* needs to implement this method to generate the code to process the row. This should only be
* called from {@link OpFusionCodegenSpecGenerator#processConsume(List, String)}.
*
* <p>Note: A operator can either consume the rows as RowData (row), or a list of variables
* (inputVars).
*
* @param inputId This is numbered starting from 1, and `1` indicates the first input.
* @param inputVars field variables of current input.
* @param row row variable of current input.
*/
String doProcessConsume(
int inputId, List<GeneratedExpression> inputVars, GeneratedExpression row);
/**
* Generate the Java source code to do operator clean work, only the leaf operator in operator
* DAG need to generate the code, other middle operators just call its input `endInputProduce`
* normally, otherwise, the operator has some specific logic.
*
* <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
* return type.
*/
void doEndInputProduce(CodeGeneratorContext codegenCtx);
/**
* The endInput method is used to do clean work for operator corresponding input, such as the
* HashAgg operator needs to flush data, and the HashJoin build side need to build hash table,
* so each operator needs to implement the corresponding clean logic in this method.
*
* <p>For blocking operators such as HashAgg, the {@link OpFusionContext#processConsume(List,
* String)} method needs to be called first to consume the data, followed by the
* `endInputConsume` method to do the cleanup work of the downstream operators. For pipeline
* operators such as Project, you only need to call the `endInputConsume` method.
*
* @param inputId This is numbered starting from 1, and `1` indicates the first input.
*/
String doEndInputConsume(int inputId);
} |
OpFusionCodegenSpecGenerator
Code Block |
---|
/**
* {@link OpFusionCodegenSpecGenerator} is used to operator fusion codegen that generate the fusion
* code, it has multiple inputs and outputs, then form a DAG. Every OpFusionCodegenSpecGenerator
* holds an {@link OpFusionCodegenSpec} that used to generate the operator process row code. In
* addition, it also provides some meta information that codegen needed.
*/
@Internal
public abstract class OpFusionCodegenSpecGenerator {
private final RowType outputType;
protected final OpFusionCodegenSpec opFusionCodegenSpec;
private final OpFusionContext opFusionContext;
private double managedMemoryFraction = 0;
public OpFusionCodegenSpecGenerator(
RowType outputType, OpFusionCodegenSpec opFusionCodegenSpec) {
this.outputType = outputType;
this.opFusionCodegenSpec = opFusionCodegenSpec;
this.opFusionContext = new OpFusionContextImpl(this);
}
/**
* Initializes the operator spec generator needed information. This method must be called before
* produce and consume related method.
*/
public void setup(Context context) {
this.managedMemoryFraction = context.getManagedMemoryFraction();
this.opFusionCodegenSpec.setup(opFusionContext);
}
public OpFusionCodegenSpec getOpFusionCodegenSpec() {
return opFusionCodegenSpec;
}
public OpFusionContext getOpFusionContext() {
return opFusionContext;
}
public abstract long getManagedMemory();
public abstract List<OpFusionCodegenSpecGenerator> getInputs();
/**
* Add the specific {@link OpFusionCodegenSpecGenerator} as the output of current operator spec
* generator, one {@link OpFusionCodegenSpecGenerator} may have multiple outputs that form a
* DAG.
*
* @param inputIdOfOutput This is numbered starting from 1, and `1` indicates the first input of
* output {@link OpFusionCodegenSpecGenerator}.
* @param output The {@link OpFusionCodegenSpecGenerator} as output of current spec generator.
*/
public abstract void addOutput(int inputIdOfOutput, OpFusionCodegenSpecGenerator output);
/** Generate Java source code to process the rows from operator corresponding input. */
public abstract void processProduce(CodeGeneratorContext fusionCtx);
/**
* Consume the generated columns or row from current operator, call its output's {@link
* OpFusionCodegenSpec#doProcessConsume(int, List, GeneratedExpression)} method.
*
* <p>Note that `outputVars` and `row` can't both be null.
*/
public abstract String processConsume(List<GeneratedExpression> outputVars, String row);
/** Generate Java source code to do clean work for operator corresponding input. */
public abstract void endInputProduce(CodeGeneratorContext fusionCtx);
/**
* Generate code to trigger the clean work of operator, call its output's {@link
* OpFusionCodegenSpec#doEndInputConsume(int)}. The leaf operator start to call endInputConsume
* method.
*/
public abstract String endInputConsume();
} |
FusionCodegenExecNode
If one ExecNode
wants to support OFCG, this Interface needs to be implemented.
Code Block | ||||
---|---|---|---|---|
| ||||
/** A {@link ExecNode} which support operator fusion codegen. */ public interface FusionCodegenExecNode { /** * Whether this ExecNode supports OFCG or not. */ boolean supportFusionCodegen(); /** * Translates this node into a {@link OpFusionCodegenSpecOpFusionCodegenSpecGenerator}. * * <p>NOTE: This method should return same spec generator result if called multiple times. * * @param planner The {@link Planner} of the translated graph. */ OpFusionCodegenSpecOpFusionCodegenSpecGenerator translateToFusionCodegenSpec(Planner planner); } |
...