Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Internal Interface Changes


This interface maintains the required information an operator needs to support OFCG.


Code Block
Code Block
/** An interface for those physical operators that support
 * A OpFusionContext contains information about the context in which {@link OpFusionCodegenSpec}
 * needed to do operator fusion codegen.
traitpublic interface OpFusionCodegenSpecOpFusionContext {

 Return the *output Iftype anof operatorcurrent has multiple outputs in the OpFusionCodegenSpec DAG, we only need to call it{@link OpFusionCodegenSpecGenerator}. */
    RowType getOutputType();

   * produce method* once,Return sothe wemanaged needmemory thesefraction twoof flag variable.
   */this {@link OpFusionCodegenSpecGenerator} needed during
  private var hasProcessProduceTraversed =* false
all fusion privateoperators.
  var hasEndInputProduceTraversed = false*/

  protected var managedMemoryFraction: Double = 0double getManagedMemoryFraction();

  val inputs: ListBuffer[OpFusionCodegenSpec] =
    new ListBuffer[OpFusionCodegenSpec]
  val outputs: ListBuffer[(Int, OpFusionCodegenSpec)] = /** Return the input {@link OpFusionContext} of this {@link OpFusionCodegenSpecGenerator}. */
    newList<OpFusionContext> ListBuffer[(Int, OpFusionCodegenSpec)]getInputFusionContexts();

 Prefix   used in* theGenerate currentJava operator'ssource variablecode names. */
  def variablePrefix: String

  def getOutputType: RowType

  def getOperatorCtx: CodeGeneratorContext

  def getExprCodeGenerator: ExprCodeGenerator

  def getManagedMemory: Long = 0L

  final def setManagedMemFraction(managedMemoryFraction: Double) =
    this.managedMemoryFraction = managedMemoryFraction

  /*to process the rows from operator corresponding input, delegate to
     * {@link OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} method.
    void processProduce(CodeGeneratorContext codegenCtx);

     * Generate Java source code to processdo theclean rowswork fromfor operator corresponding input. */
, delegate to
     final* def{@link processProduceOpFusionCodegenSpecGenerator#endInputProduce(fusionCtx: CodeGeneratorContext):} Unitmethod.
 = {
    if (!hasProcessProduceTraversed) {*/
    void endInputProduce(CodeGeneratorContext codegenCtx);

    default String doProcessProduceprocessConsume(fusionCtxList<GeneratedExpression> outputVars) {
      hasProcessProduceTraversed = true  return processConsume(outputVars, null);


     * GenerateConsume the Java source code to process rows, only the leaf operator in operator DAG need to
   * generate the code which produce the row, other middle operators just call its input
generated columns or row from current {@link OpFusionCodegenSpec}, delegate to
     * {@link OpFusionCodegenSpecGenerator#processConsume(List, String)} ()} method.
     * `processProduce`<p>Note normally,that otherwise,`outputVars` theand operator`row` hascan't someboth specificbe logicnull.
 The leaf operator
 produce row first, andString then call its `processConsume` method to consume row.processConsume(List<GeneratedExpression> outputVars, String row);

   * The code* generatedGenerate byJava leafsource operatorcode willto bedo savedclean inwork fusionCtx,for so{@link thisOpFusionCodegenSpec} methodcorresponding
 doesn't has
   * return type.
  protected def doProcessProduce(fusionCtx: CodeGeneratorContext): Unit

   * Consume the generated columns or row from current operator, call its output's
   * `doProcessConsume()`.
   * Note that `outputVars` and `row` can't both be null.
  final def processConsume(outputVars: Seq[GeneratedExpression], row: String = null): String = {
    val inputVars = if (outputVars != null) {
      assert(outputVars.length == getOutputType.getFieldCount)
    } else {
      assert(row != null, "outputVars and row can't both be null.")

    // if this operator has multiple output operators, we need to materialize them in advance to
    // avoid be evaluated multiple times in downstream
    val exprCode = if (outputs.length > 1) { => expr.getCode).mkString("\n")
    } else {

    val consumeCode = outputs
        op => {
          val inputIdOfOutputNode = op._1
          val output = op._2
          // we need to bind out ctx before call its consume to reuse the input expression
          if (inputIdOfOutputNode == 1) {
            output.getExprCodeGenerator.bindInputWithExpr(getOutputType, inputVars, row)
          } else {
            output.getExprCodeGenerator.bindSecondInputWithExpr(getOutputType, inputVars, row)

          // we always pass column vars and row var to output op simultaneously, the output decide to use which one
          output.doProcessConsume(inputIdOfOutputNode, inputVars, row)

       | // consume code

   * The process method is responsible for the operator data processing logic, so each operator
   * needs to implement this method to generate the code to process the row. This should only be
   * called from `processConsume`.
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
   * @param inputVars
   *   field variables of current input.
   * @param row
   *   row variable of current input.
   * Note: A operator can either consume the rows as RowData (row), or a list of variables
   * (inputVars).
  def doProcessConsume(inputId: Int, inputVars: Seq[GeneratedExpression], row: String): String

  /** Generate Java source code to do clean work for operator corresponding input. */
  final def endInputProduce(fusionCtx: CodeGeneratorContext): Unit = {
    if (!hasEndInputProduceTraversed) {
      hasEndInputProduceTraversed = true

   * Generate the Java source code to do operator clean work, only the leaf operator in operator DAG
   * need to generate the code, other middle operators just call its input `endInputProduce`
   * normally, otherwise, the operator has some specific logic.
   * The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
   * return type.
  protected def doEndInputProduce(fusionCtx: CodeGeneratorContext): Unit

   * Generate code to trigger the clean work of operator, call its output's `doEndInputConsume()`.
   * The leaf operator start to call `endInputConsume`.
  final def endInputConsume(): String = {
       |${ => op._2.doEndInputConsume(op._1)).mkString("\n")}

   * The endInput method is used to do clean work for operator corresponding input, such as the
   * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table, so
   * each operator needs to implement the corresponding clean logic in this method.
   * For blocking operators such as HashAgg, the `processConsume` method needs to be called first to
   * consume the data, followed by the `endInputConsume` method to do the cleanup work of the
   * downstream operators. For pipeline operators such as Project, you only need to call the
   * `endInputConsume` method.
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
  def doEndInputConsume(inputId: Int): Stringinput, delegate to {@link OpFusionCodegenSpecGenerator#endInputConsume()} method.
    String endInputConsume();


This interface maintains the required information an operator needs to support OFCG.

Code Block
/** An interface for those physical operators that support operator fusion codegen. */
public interface OpFusionCodegenSpec {

     * Initializes the operator spec. Sets access to the context. This method must be called before
     * doProduce and doConsume related methods.
    void setup(OpFusionContext opFusionContext);

    /** Prefix used in the current operator's variable names. */
    String variablePrefix();

     * The subset of column index those should be evaluated before this operator.
     * <p>We will use this to insert some code to access those columns that are actually used by
     * current operator before calling doProcessConsume().
    Set<Integer> usedInputColumns(int inputId);

     * Specific inputId of current operator needed {@link RowData} type, this is used to notify the
     * upstream operator wrap the proper {@link RowData} we needed before call doProcessConsume
     * method. For example, HashJoin build side need {@link BinaryRowData}.
    Class<? extends RowData> getInputRowDataClass(int inputId);

     * Every operator need one {@link CodeGeneratorContext} to store the context needed during
     * operator fusion codegen.
    CodeGeneratorContext getCodeGeneratorContext();

    /** Get the {@link ExprCodeGenerator} used by this operator during operator fusion codegen, . */
    ExprCodeGenerator getExprCodeGenerator();

     * Generate the Java source code to process rows, only the leaf operator in operator DAG need to
     * generate the code which produce the row, other middle operators just call its input {@link
     * OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} normally, otherwise, the
     * operator has some specific logic. The leaf operator produce row first, and then call {@link
     * OpFusionContext#processConsume(List)} method to consume row.
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
    void doProcessProduce(CodeGeneratorContext codegenCtx);

     * The process method is responsible for the operator data processing logic, so each operator
     * needs to implement this method to generate the code to process the row. This should only be
     * called from {@link OpFusionCodegenSpecGenerator#processConsume(List, String)}.
     * <p>Note: A operator can either consume the rows as RowData (row), or a list of variables
     * (inputVars).
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
     * @param inputVars field variables of current input.
     * @param row row variable of current input.
    String doProcessConsume(
            int inputId, List<GeneratedExpression> inputVars, GeneratedExpression row);

     * Generate the Java source code to do operator clean work, only the leaf operator in operator
     * DAG need to generate the code, other middle operators just call its input `endInputProduce`
     * normally, otherwise, the operator has some specific logic.
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
    void doEndInputProduce(CodeGeneratorContext codegenCtx);

     * The endInput method is used to do clean work for operator corresponding input, such as the
     * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table,
     * so each operator needs to implement the corresponding clean logic in this method.
     * <p>For blocking operators such as HashAgg, the {@link OpFusionContext#processConsume(List,
     * String)} method needs to be called first to consume the data, followed by the
     * `endInputConsume` method to do the cleanup work of the downstream operators. For pipeline
     * operators such as Project, you only need to call the `endInputConsume` method.
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
    String doEndInputConsume(int inputId);


Code Block
 * {@link OpFusionCodegenSpecGenerator} is used to operator fusion codegen that generate the fusion
 * code, it has multiple inputs and outputs, then form a DAG. Every OpFusionCodegenSpecGenerator
 * holds an {@link OpFusionCodegenSpec} that used to generate the operator process row code. In
 * addition, it also provides some meta information that codegen needed.
public abstract class OpFusionCodegenSpecGenerator {

    private final RowType outputType;
    protected final OpFusionCodegenSpec opFusionCodegenSpec;
    private final OpFusionContext opFusionContext;
    private double managedMemoryFraction = 0;

    public OpFusionCodegenSpecGenerator(
            RowType outputType, OpFusionCodegenSpec opFusionCodegenSpec) {
        this.outputType = outputType;
        this.opFusionCodegenSpec = opFusionCodegenSpec;
        this.opFusionContext = new OpFusionContextImpl(this);

     * Initializes the operator spec generator needed information. This method must be called before
     * produce and consume related method.
    public void setup(Context context) {
        this.managedMemoryFraction = context.getManagedMemoryFraction();

    public OpFusionCodegenSpec getOpFusionCodegenSpec() {
        return opFusionCodegenSpec;

    public OpFusionContext getOpFusionContext() {
        return opFusionContext;

    public abstract long getManagedMemory();

    public abstract List<OpFusionCodegenSpecGenerator> getInputs();

     * Add the specific {@link OpFusionCodegenSpecGenerator} as the output of current operator spec
     * generator, one {@link OpFusionCodegenSpecGenerator} may have multiple outputs that form a
     * DAG.
     * @param inputIdOfOutput This is numbered starting from 1, and `1` indicates the first input of
     *     output {@link OpFusionCodegenSpecGenerator}.
     * @param output The {@link OpFusionCodegenSpecGenerator} as output of current spec generator.
    public abstract void addOutput(int inputIdOfOutput, OpFusionCodegenSpecGenerator output);

    /** Generate Java source code to process the rows from operator corresponding input. */
    public abstract void processProduce(CodeGeneratorContext fusionCtx);

     * Consume the generated columns or row from current operator, call its output's {@link
     * OpFusionCodegenSpec#doProcessConsume(int, List, GeneratedExpression)} method.
     * <p>Note that `outputVars` and `row` can't both be null.
    public abstract String processConsume(List<GeneratedExpression> outputVars, String row);

    /** Generate Java source code to do clean work for operator corresponding input. */
    public abstract void endInputProduce(CodeGeneratorContext fusionCtx);

     * Generate code to trigger the clean work of operator, call its output's {@link
     * OpFusionCodegenSpec#doEndInputConsume(int)}. The leaf operator start to call endInputConsume
     * method.
    public abstract String endInputConsume();


If one ExecNode wants to support OFCG, this Interface needs to be implemented.

Code Block
/** A {@link ExecNode} which support operator fusion codegen. */
public interface FusionCodegenExecNode {

     * Whether this ExecNode supports OFCG or not.
    boolean supportFusionCodegen();

     * Translates this node into a {@link OpFusionCodegenSpecOpFusionCodegenSpecGenerator}.
     * <p>NOTE: This method should return same spec generator result if called multiple times.
     * @param planner The {@link Planner} of the translated graph.
      OpFusionCodegenSpecOpFusionCodegenSpecGenerator translateToFusionCodegenSpec(Planner planner);
