Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Internal Interface Changes

OpFusionCodegenSpec

This interface maintains the required information an operator needs to support OFCG.

OpFusioContext

OpFusionCodegenSpec
Code Block
Code Block
languagejava
title
/** An interface for those physical operators that support
 * A OpFusionContext contains information about the context in which {@link OpFusionCodegenSpec}
 * needed to do operator fusion codegen.
 */
traitpublic interface OpFusionCodegenSpecOpFusionContext {

    /**
 Return the *output Iftype anof operatorcurrent has multiple outputs in the OpFusionCodegenSpec DAG, we only need to call it{@link OpFusionCodegenSpecGenerator}. */
    RowType getOutputType();

    /**
   * produce method* once,Return sothe wemanaged needmemory thesefraction twoof flag variable.
   */this {@link OpFusionCodegenSpecGenerator} needed during
  private var hasProcessProduceTraversed =* false
all fusion privateoperators.
  var hasEndInputProduceTraversed = false*/

  protected var managedMemoryFraction: Double = 0double getManagedMemoryFraction();

  val inputs: ListBuffer[OpFusionCodegenSpec] =
    new ListBuffer[OpFusionCodegenSpec]
  val outputs: ListBuffer[(Int, OpFusionCodegenSpec)] = /** Return the input {@link OpFusionContext} of this {@link OpFusionCodegenSpecGenerator}. */
    newList<OpFusionContext> ListBuffer[(Int, OpFusionCodegenSpec)]getInputFusionContexts();

    /**
 Prefix   used in* theGenerate currentJava operator'ssource variablecode names. */
  def variablePrefix: String

  def getOutputType: RowType

  def getOperatorCtx: CodeGeneratorContext

  def getExprCodeGenerator: ExprCodeGenerator

  def getManagedMemory: Long = 0L

  final def setManagedMemFraction(managedMemoryFraction: Double) =
    this.managedMemoryFraction = managedMemoryFraction

  /*to process the rows from operator corresponding input, delegate to
     * {@link OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} method.
     */
    void processProduce(CodeGeneratorContext codegenCtx);

    /**
     * Generate Java source code to processdo theclean rowswork fromfor operator corresponding input. */
, delegate to
     final* def{@link processProduceOpFusionCodegenSpecGenerator#endInputProduce(fusionCtx: CodeGeneratorContext):} Unitmethod.
 = {
    if (!hasProcessProduceTraversed) {*/
    void endInputProduce(CodeGeneratorContext codegenCtx);

    default String doProcessProduceprocessConsume(fusionCtxList<GeneratedExpression> outputVars) {
      hasProcessProduceTraversed = true  return processConsume(outputVars, null);
    }

  }

  /**
     * GenerateConsume the Java source code to process rows, only the leaf operator in operator DAG need to
   * generate the code which produce the row, other middle operators just call its input
generated columns or row from current {@link OpFusionCodegenSpec}, delegate to
     * {@link OpFusionCodegenSpecGenerator#processConsume(List, String)} ()} method.
     *
     * `processProduce`<p>Note normally,that otherwise,`outputVars` theand operator`row` hascan't someboth specificbe logicnull.
 The leaf operator
   */
 produce row first, andString then call its `processConsume` method to consume row.processConsume(List<GeneratedExpression> outputVars, String row);

    /**
   * The code* generatedGenerate byJava leafsource operatorcode willto bedo savedclean inwork fusionCtx,for so{@link thisOpFusionCodegenSpec} methodcorresponding
 doesn't has
   * return type.
   */
  protected def doProcessProduce(fusionCtx: CodeGeneratorContext): Unit

  /**
   * Consume the generated columns or row from current operator, call its output's
   * `doProcessConsume()`.
   *
   * Note that `outputVars` and `row` can't both be null.
   */
  final def processConsume(outputVars: Seq[GeneratedExpression], row: String = null): String = {
    val inputVars = if (outputVars != null) {
      assert(outputVars.length == getOutputType.getFieldCount)
      outputVars
    } else {
      assert(row != null, "outputVars and row can't both be null.")
      getExprCodeGenerator.generateInputAccessExprs()
    }

    // if this operator has multiple output operators, we need to materialize them in advance to
    // avoid be evaluated multiple times in downstream
    val exprCode = if (outputs.length > 1) {
      outputVars.map(expr => expr.getCode).mkString("\n")
    } else {
      ""
    }

    val consumeCode = outputs
      .map(
        op => {
          val inputIdOfOutputNode = op._1
          val output = op._2
          // we need to bind out ctx before call its consume to reuse the input expression
          if (inputIdOfOutputNode == 1) {
            output.getExprCodeGenerator.bindInputWithExpr(getOutputType, inputVars, row)
          } else {
            output.getExprCodeGenerator.bindSecondInputWithExpr(getOutputType, inputVars, row)
          }

          // we always pass column vars and row var to output op simultaneously, the output decide to use which one
          output.doProcessConsume(inputIdOfOutputNode, inputVars, row)
        })
      .mkString("\n")

    s"""
       |$exprCode
       | // consume code
       |$consumeCode
     """.stripMargin
  }

  /**
   * The process method is responsible for the operator data processing logic, so each operator
   * needs to implement this method to generate the code to process the row. This should only be
   * called from `processConsume`.
   *
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
   * @param inputVars
   *   field variables of current input.
   * @param row
   *   row variable of current input.
   *
   * Note: A operator can either consume the rows as RowData (row), or a list of variables
   * (inputVars).
   */
  def doProcessConsume(inputId: Int, inputVars: Seq[GeneratedExpression], row: String): String

  /** Generate Java source code to do clean work for operator corresponding input. */
  final def endInputProduce(fusionCtx: CodeGeneratorContext): Unit = {
    if (!hasEndInputProduceTraversed) {
      doEndInputProduce(fusionCtx)
      hasEndInputProduceTraversed = true
    }
  }

  /**
   * Generate the Java source code to do operator clean work, only the leaf operator in operator DAG
   * need to generate the code, other middle operators just call its input `endInputProduce`
   * normally, otherwise, the operator has some specific logic.
   *
   * The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
   * return type.
   */
  protected def doEndInputProduce(fusionCtx: CodeGeneratorContext): Unit

  /**
   * Generate code to trigger the clean work of operator, call its output's `doEndInputConsume()`.
   * The leaf operator start to call `endInputConsume`.
   */
  final def endInputConsume(): String = {
    s"""
       |${outputs.map(op => op._2.doEndInputConsume(op._1)).mkString("\n")}
     """.stripMargin
  }

  /**
   * The endInput method is used to do clean work for operator corresponding input, such as the
   * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table, so
   * each operator needs to implement the corresponding clean logic in this method.
   *
   * For blocking operators such as HashAgg, the `processConsume` method needs to be called first to
   * consume the data, followed by the `endInputConsume` method to do the cleanup work of the
   * downstream operators. For pipeline operators such as Project, you only need to call the
   * `endInputConsume` method.
   *
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
   */
  def doEndInputConsume(inputId: Int): Stringinput, delegate to {@link OpFusionCodegenSpecGenerator#endInputConsume()} method.
     */
    String endInputConsume();
}


OpFusionCodegenSpec

This interface maintains the required information an operator needs to support OFCG.

Code Block
languagejava
titleOpFusionCodegenSpec
/** An interface for those physical operators that support operator fusion codegen. */
@Internal
public interface OpFusionCodegenSpec {

    /**
     * Initializes the operator spec. Sets access to the context. This method must be called before
     * doProduce and doConsume related methods.
     */
    void setup(OpFusionContext opFusionContext);

    /** Prefix used in the current operator's variable names. */
    String variablePrefix();

    /**
     * The subset of column index those should be evaluated before this operator.
     *
     * <p>We will use this to insert some code to access those columns that are actually used by
     * current operator before calling doProcessConsume().
     */
    Set<Integer> usedInputColumns(int inputId);

    /**
     * Specific inputId of current operator needed {@link RowData} type, this is used to notify the
     * upstream operator wrap the proper {@link RowData} we needed before call doProcessConsume
     * method. For example, HashJoin build side need {@link BinaryRowData}.
     */
    Class<? extends RowData> getInputRowDataClass(int inputId);

    /**
     * Every operator need one {@link CodeGeneratorContext} to store the context needed during
     * operator fusion codegen.
     */
    CodeGeneratorContext getCodeGeneratorContext();

    /** Get the {@link ExprCodeGenerator} used by this operator during operator fusion codegen, . */
    ExprCodeGenerator getExprCodeGenerator();

    /**
     * Generate the Java source code to process rows, only the leaf operator in operator DAG need to
     * generate the code which produce the row, other middle operators just call its input {@link
     * OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} normally, otherwise, the
     * operator has some specific logic. The leaf operator produce row first, and then call {@link
     * OpFusionContext#processConsume(List)} method to consume row.
     *
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
     */
    void doProcessProduce(CodeGeneratorContext codegenCtx);

    /**
     * The process method is responsible for the operator data processing logic, so each operator
     * needs to implement this method to generate the code to process the row. This should only be
     * called from {@link OpFusionCodegenSpecGenerator#processConsume(List, String)}.
     *
     * <p>Note: A operator can either consume the rows as RowData (row), or a list of variables
     * (inputVars).
     *
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
     * @param inputVars field variables of current input.
     * @param row row variable of current input.
     */
    String doProcessConsume(
            int inputId, List<GeneratedExpression> inputVars, GeneratedExpression row);

    /**
     * Generate the Java source code to do operator clean work, only the leaf operator in operator
     * DAG need to generate the code, other middle operators just call its input `endInputProduce`
     * normally, otherwise, the operator has some specific logic.
     *
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
     */
    void doEndInputProduce(CodeGeneratorContext codegenCtx);

    /**
     * The endInput method is used to do clean work for operator corresponding input, such as the
     * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table,
     * so each operator needs to implement the corresponding clean logic in this method.
     *
     * <p>For blocking operators such as HashAgg, the {@link OpFusionContext#processConsume(List,
     * String)} method needs to be called first to consume the data, followed by the
     * `endInputConsume` method to do the cleanup work of the downstream operators. For pipeline
     * operators such as Project, you only need to call the `endInputConsume` method.
     *
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
     */
    String doEndInputConsume(int inputId);
}


OpFusionCodegenSpecGenerator

Code Block
/**
 * {@link OpFusionCodegenSpecGenerator} is used to operator fusion codegen that generate the fusion
 * code, it has multiple inputs and outputs, then form a DAG. Every OpFusionCodegenSpecGenerator
 * holds an {@link OpFusionCodegenSpec} that used to generate the operator process row code. In
 * addition, it also provides some meta information that codegen needed.
 */
@Internal
public abstract class OpFusionCodegenSpecGenerator {

    private final RowType outputType;
    protected final OpFusionCodegenSpec opFusionCodegenSpec;
    private final OpFusionContext opFusionContext;
    private double managedMemoryFraction = 0;

    public OpFusionCodegenSpecGenerator(
            RowType outputType, OpFusionCodegenSpec opFusionCodegenSpec) {
        this.outputType = outputType;
        this.opFusionCodegenSpec = opFusionCodegenSpec;
        this.opFusionContext = new OpFusionContextImpl(this);
    }

    /**
     * Initializes the operator spec generator needed information. This method must be called before
     * produce and consume related method.
     */
    public void setup(Context context) {
        this.managedMemoryFraction = context.getManagedMemoryFraction();
        this.opFusionCodegenSpec.setup(opFusionContext);
    }

    public OpFusionCodegenSpec getOpFusionCodegenSpec() {
        return opFusionCodegenSpec;
    }

    public OpFusionContext getOpFusionContext() {
        return opFusionContext;
    }

    public abstract long getManagedMemory();

    public abstract List<OpFusionCodegenSpecGenerator> getInputs();

    /**
     * Add the specific {@link OpFusionCodegenSpecGenerator} as the output of current operator spec
     * generator, one {@link OpFusionCodegenSpecGenerator} may have multiple outputs that form a
     * DAG.
     *
     * @param inputIdOfOutput This is numbered starting from 1, and `1` indicates the first input of
     *     output {@link OpFusionCodegenSpecGenerator}.
     * @param output The {@link OpFusionCodegenSpecGenerator} as output of current spec generator.
     */
    public abstract void addOutput(int inputIdOfOutput, OpFusionCodegenSpecGenerator output);

    /** Generate Java source code to process the rows from operator corresponding input. */
    public abstract void processProduce(CodeGeneratorContext fusionCtx);

    /**
     * Consume the generated columns or row from current operator, call its output's {@link
     * OpFusionCodegenSpec#doProcessConsume(int, List, GeneratedExpression)} method.
     *
     * <p>Note that `outputVars` and `row` can't both be null.
     */
    public abstract String processConsume(List<GeneratedExpression> outputVars, String row);

    /** Generate Java source code to do clean work for operator corresponding input. */
    public abstract void endInputProduce(CodeGeneratorContext fusionCtx);

    /**
     * Generate code to trigger the clean work of operator, call its output's {@link
     * OpFusionCodegenSpec#doEndInputConsume(int)}. The leaf operator start to call endInputConsume
     * method.
     */
    public abstract String endInputConsume();
}


FusionCodegenExecNode

If one ExecNode wants to support OFCG, this Interface needs to be implemented.

Code Block
languagejava
titleFusionCodegenExecNode
/** A {@link ExecNode} which support operator fusion codegen. */
public interface FusionCodegenExecNode {

     /**
     * Whether this ExecNode supports OFCG or not.
     */
    boolean supportFusionCodegen();

    /**
     * Translates this node into a {@link OpFusionCodegenSpecOpFusionCodegenSpecGenerator}.
     *
     * <p>NOTE: This method should return same spec generator result if called multiple times.
     *
     * @param planner The {@link Planner} of the translated graph.
     */
      OpFusionCodegenSpecOpFusionCodegenSpecGenerator translateToFusionCodegenSpec(Planner planner);
}

...