Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


As main memory grows, query performance is more and more determined by the raw CPU costs of query processing itself, this is due to the query processing techniques based on interpreted execution shows poor performance on modern CPUs due to lack of locality and frequent instruction mis-prediction. Therefore, the industry is also researching how to improve engine performance by increasing operator execution efficiency. In addition, during the process of optimizing Flink's performance for TPC-DS queries, we found that a significant amount of CPU time was spent on virtual function calls, framework collector calls, and invalid calculations, which can be optimized to improve the overall engine performance. To address these issues, we found that operator execution efficiency can be optimized in the following ways:

  • Reduce virtual function calls 
  • Keep data in registers as much as possible to avoid materializing it in memory
  • Generate optimal code for queries to reduce instruction cache miss
  • Use modern compilers to automatically unroll loops, reducing loop iteration and CPU instruction jump counts
  • Lazy computing

After researching these requirements, we found that an operator fusion code generator technology can achieve these goals, which is proposed by Thomas Neumann in the paper[1]. After comparing the performance benefits of vectorization and codegen[2][3], as well as considering the high development costs of vectorization, we decided to introduce Operator Fusion Codegen in Flink. Here we refer to it as OFCG. We have finished a PoC and got a 12% gain overall when only supporting only Calc&HashJoin&HashAgg operator. In some queries, we even get more than 30% gain.


Public Interfaces

OFCG does not involve any changes to the public interface. However, as it is a significant new feature that is currently experimental, it requires iterative development over multiple versions to improve continuously. Therefore, we need to introduce the options `table.exec.operator-fusion-codegen.enabled` to enable this feature. Currently, this option is disabled by default. As the feature gradually improves, we plan to enable it by default after two or three versions.

Proposed Changes

Outline Design

The following image illustrates how sql text is converted to a Transformation DAG.

RelNode is a relational operator defined in Apache Calcite. A relational expression is represented by a tree of RelNode, which can be optimized by the planner.

ExecNode is an execution node defined in the planner. Currently, each physical RelNode corresponds to an ExecNode, and an ExecNode will be translated to a Tranformation. 

The part included in the blue dashed box in the image is relevant to our design, which means that the work of the OFCG  mainly focuses on the transformation from ExecNode  to Transformation .

Workflow of OFCG

Drawing on the background information provided and the capabilities of Flink SQL, we have designed the overall workflow of OFCG by combining the produce-consume interface proposed in the paper [1] with the structure of Flink engine's runtime operators. The flowchart is shown below:

  1. Traverse the ExecNode DAG and create a FusionExecNode  for physical operators that can be fused together.

  2. Traverse each member operator in every FusionExecNode  to determine whether they support code generation.

  3. If any member operator does not support codegen, generate a Transformation DAG based on the topological relationship of member ExecNode  and jump to step 8.

  4. If all member ExecNode  support codegen, traverse them based on their topological relationships. For each ExecNode, generate a corresponding FusionCodegenSpec object that describes the information required to perform codegen on that operator. The resulting FusionCodegenSpec objects form a DAG.

  5. Traverse the FusionCodegenSpec  DAG based on topological relationships. First, call the produce-consume interface of the process method to generate code fragments for all member operators that process data, then concatenate them together.

  6. Next, call the produce-consume interface of the endInput to generate code fragments for all member operator's endInput method, then also concatenate them.

  7. Assemble the generated process and endInput-related code fragments into a fused operator.

  8. Generate a FusionTransformation, setting the parallelism and managed memory for the fused operator.

  9. Generate the JobGraph .

When performing operator fusion on ExecNode, there are two operator fusion mechanisms to consider. The first mechanism is proposed in [4], which supports fusing multiple operators with multiple inputs into one, known as BatchExecMultipleInput. This is an existing design and we can reuse its capability. The second mechanism is fusing multiple operators with only one input that can be fused together. Regarding this mechanism, we only perform OFCG on the ExecNode layer if all the operators that can be fused together support codegen. Otherwise, we delegate the fusion to the runtime's OperatorChain mechanism because there is no benefit to fusing non-codegen operators together at the ExecNode layer.

It is important to note that for the mechanism proposed in [4], if there are operators within BatchExecMultipleInput that do not support codegen, we will abandon OFCG and fallback to the default implementation. There are two main reasons for this decision. Firstly, if we were to perform OFCG on only a subset of ExecNode that support codegen, this would make the design and implementation of BatchExecMultipleInput  more complex. Secondly, if there are only a few codegen-supported operators within the BatchExecMultipleInput, the benefits of performing OFCG are negligible. Taking into account the complexity and benefits, we have decided not to perform codegen altogether. Instead, we will put more effort into making more operators support OFCG.

Flink OFCG produce-consume 

Since Flink is a unified batch-stream compute engine, the design of the runtime operator structure is more complex due to the fact that batch jobs will eventually end. Currently, Flink's runtime operators mainly have two core methods, processElement and endInput. The processElement  method is responsible for the core computation logic of the operator, and is required in both streaming and batch mode. However, in batch mode, data is bounded, so the job will eventually end. Therefore, for batch jobs, an additional endInput method is needed to perform some cleanup work after all input data has been received, such as the HashAgg operator flushing data only after receiving all the data, and the build side of HashJoin completing the build of the hash table. Considering the needs of both scenarios, the produce-consume interface proposed in [1] cannot directly meet our case. We need two sets of produce-consume interfaces, respectively, the produce-consume interface corresponding to processElement , which is used to generate the code related to the processing of data by all operators. The other is the produce-consume interface corresponding to endInput, which is used to generate the code for the other processing logic needed after the data is processed. The two produce-consume interfaces work together to generate the complete code for the OFCG.

Next, let's take an example to see how the two sets of produce-consume interfaces work with each other to complete OFCG.

The concept of pipeline breaker is proposed and defined in [1]. According to its definition, this pattern can be partitioned into four pipelines, and the yellow output operator is a virtual operator introduced for OFCG, whose role is to bridge the fusion operator with the downstream operator.

Normally, in stream mode, the job is long-running, so each operator only needs to implement the processElement method, correspondingly in the OFCG case, only the process produce-consume interface needs to be implemented. In batch mode, the job will end and the operator may need to do some cleanup work in the endInput method, such as flush data to the downstream, so in the OFCG case, both the process and endInput produce-consume interfaces need to be implemented.

In the first step, we recursively call the processProduce method from the Output node until the leaf node of the operator tree, starts generating data, and then calls the downstream consume method to generate the code fragment of each operator to process the data. After the recursive call to the HashAgg operator, since it is a blocking operator, it needs to wait until all the data is processed before it starts to send data to downstream, so it does not call the consume method of the downstream operator immediately, so the recursive call to the process method ends.

In the second step, we recursively call the endInputProduce method from the Output node until the leaf node of the operator tree, and start to call back the endInputConsume method.  If the current operator needs to send the data downstream first in the endInput method, it needs to call the processConsume method of the downstream node first to consume the data. When the data is sent, then endInputConsume of the downstream operator is called to trigger the clean action of the downstream operator. In this pattern, since HashAgg is a blocking operator, when the endInputConsume method of this operator is called recursively, it will first call the processConsume  method of the downstream Join operator to consume the data generated by this operator, and then call the endInputConsume method of the Join operator to trigger other actions. The call is recursive until the end of the Output node, which is responsible for sending the data to the downstream node.

We complete the OFCG of multiple operators through the cooperation of process and endInput produce-consume interfaces, and the overall flow is relatively brief and clear.

Other Considerations

Janino Compilation

Considering that the Java code generated by OFCG may be too long for Janino to compile successfully, we will try to compile the generated code at the end of OFCG. If the compilation fails, we need to fall back to the original way to avoid the whole query cannot be executed because of the compilation problem.

Regarding the code length of method, we reuse the code split framework which is introduced in Flink to do code spilt to avoid too long method.


Regarding the parallelism of OFCG fusion operator, we take the maximum of all its input operators, which is consistent with [4]. For operators that can only be executed singly parallelism, we do not fuse them into OFCG. Also for Source and Sink operators, since they do not support codegen, we do not fuse them into OFCG nodes, so they work fine even if their parallelism is set separately.

In the stream scenario, users may have the need to set the operator parallelism separately, currently, we do not support the SQL operator to set parallelism separately yet, and overall there is no problem. If we support operator granularity setting parallelism in the future, we cannot support it for this case because we can't sense operator parallelism directly at the ExecNode layer, which is a limitation for us now. If you need to set operator parallelism separately, you need to turn off the OFCG feature. Of course, we will find a way to support setting the parallelism of the operator separately if it is supported in the future.


In batch mode, HashJoin and NestedLoopJoin start to read probe side input only after the build side input is finished. In addition, HashJoin and HashAgg operator also needs to use managed memory , so OFCG for batch should deal with both "input selection" for the inputs and "managed memory".

Input Selection

Keep the same way as in [4] input selection section.

Managed Memory

If there are operators that need to use managed memory, the ratio of managed memory for multiple operators included in fused operator is calculated in the same way as in [4] managed memory section.


In streaming mode, we need to consider the state, checkpoint, barrier after supporting OFCG.

State & Checkpoint

After support OFCG, all operators will be executed in a fused operator, the original operator became a function in it,  so there is nothing we need to consider about state initialization. Another thing we need to consider is that a fused operator may fuse some operators of the same kind with the same state names. For this problem, we need to guarantee that all state names are unique during codegen. Otherwise, there would be state sharing.

State compatibility

The state compatibility will be broken as in [4] state compatibility section.


Keep the same way as in [4] barrier section.


From the technical implementation point of view, this design is feasible in both stream and batch scenarios, so I consider both stream and batch mode. In the stream scenario, for stateful operator, according to our business experience, basically the bottleneck is on the state access, so the optimization effect of OFCG for the stream will not be particularly obvious, so we will not give priority to support it currently. On the contrary, in the batch scenario, where CPU is the bottleneck, this optimization is gainful. Taking the above into account, this FLIP is currently scoped to batch mode, we don't support streaming operators in the short term.

Internal Interface Changes


This interface maintains the required information an operator needs to support OFCG.

/** An interface for those physical operators that support operator fusion codegen. */
trait OpFusionCodegenSpec {

   * If an operator has multiple outputs in the OpFusionCodegenSpec DAG, we only need to call it
   * produce method once, so we need these two flag variable.
  private var hasProcessProduceTraversed = false
  private var hasEndInputProduceTraversed = false

  protected var managedMemoryFraction: Double = 0

  val inputs: ListBuffer[OpFusionCodegenSpec] =
    new ListBuffer[OpFusionCodegenSpec]
  val outputs: ListBuffer[(Int, OpFusionCodegenSpec)] =
    new ListBuffer[(Int, OpFusionCodegenSpec)]

  /** Prefix used in the current operator's variable names. */
  def variablePrefix: String

  def getOutputType: RowType

  def getOperatorCtx: CodeGeneratorContext

  def getExprCodeGenerator: ExprCodeGenerator

  def getManagedMemory: Long = 0L

  final def setManagedMemFraction(managedMemoryFraction: Double) =
    this.managedMemoryFraction = managedMemoryFraction

  /** Generate Java source code to process the rows from operator corresponding input. */
  final def processProduce(fusionCtx: CodeGeneratorContext): Unit = {
    if (!hasProcessProduceTraversed) {
      hasProcessProduceTraversed = true

   * Generate the Java source code to process rows, only the leaf operator in operator DAG need to
   * generate the code which produce the row, other middle operators just call its input
   * `processProduce` normally, otherwise, the operator has some specific logic. The leaf operator
   * produce row first, and then call its `processConsume` method to consume row.
   * The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
   * return type.
  protected def doProcessProduce(fusionCtx: CodeGeneratorContext): Unit

   * Consume the generated columns or row from current operator, call its output's
   * `doProcessConsume()`.
   * Note that `outputVars` and `row` can't both be null.
  final def processConsume(outputVars: Seq[GeneratedExpression], row: String = null): String = {
    val inputVars = if (outputVars != null) {
      assert(outputVars.length == getOutputType.getFieldCount)
    } else {
      assert(row != null, "outputVars and row can't both be null.")

    // if this operator has multiple output operators, we need to materialize them in advance to
    // avoid be evaluated multiple times in downstream
    val exprCode = if (outputs.length > 1) { => expr.getCode).mkString("\n")
    } else {

    val consumeCode = outputs
        op => {
          val inputIdOfOutputNode = op._1
          val output = op._2
          // we need to bind out ctx before call its consume to reuse the input expression
          if (inputIdOfOutputNode == 1) {
            output.getExprCodeGenerator.bindInputWithExpr(getOutputType, inputVars, row)
          } else {
            output.getExprCodeGenerator.bindSecondInputWithExpr(getOutputType, inputVars, row)

          // we always pass column vars and row var to output op simultaneously, the output decide to use which one
          output.doProcessConsume(inputIdOfOutputNode, inputVars, row)

       | // consume code

   * The process method is responsible for the operator data processing logic, so each operator
   * needs to implement this method to generate the code to process the row. This should only be
   * called from `processConsume`.
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
   * @param inputVars
   *   field variables of current input.
   * @param row
   *   row variable of current input.
   * Note: A operator can either consume the rows as RowData (row), or a list of variables
   * (inputVars).
  def doProcessConsume(inputId: Int, inputVars: Seq[GeneratedExpression], row: String): String

  /** Generate Java source code to do clean work for operator corresponding input. */
  final def endInputProduce(fusionCtx: CodeGeneratorContext): Unit = {
    if (!hasEndInputProduceTraversed) {
      hasEndInputProduceTraversed = true

   * Generate the Java source code to do operator clean work, only the leaf operator in operator DAG
   * need to generate the code, other middle operators just call its input `endInputProduce`
   * normally, otherwise, the operator has some specific logic.
   * The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
   * return type.
  protected def doEndInputProduce(fusionCtx: CodeGeneratorContext): Unit

   * Generate code to trigger the clean work of operator, call its output's `doEndInputConsume()`.
   * The leaf operator start to call `endInputConsume`.
  final def endInputConsume(): String = {
       |${ => op._2.doEndInputConsume(op._1)).mkString("\n")}

   * The endInput method is used to do clean work for operator corresponding input, such as the
   * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table, so
   * each operator needs to implement the corresponding clean logic in this method.
   * For blocking operators such as HashAgg, the `processConsume` method needs to be called first to
   * consume the data, followed by the `endInputConsume` method to do the cleanup work of the
   * downstream operators. For pipeline operators such as Project, you only need to call the
   * `endInputConsume` method.
   * @param inputId
   *   This is numbered starting from 1, and `1` indicates the first input.
  def doEndInputConsume(inputId: Int): String


If one ExecNode wants to support OFCG, this Interface needs to be implemented.

/** A {@link ExecNode} which support operator fusion codegen. */
public interface FusionCodegenExecNode {

     * Whether this ExecNode supports OFCG or not.
    boolean supportFusionCodegen();

     * Translates this node into a {@link OpFusionCodegenSpec}.
     * <p>NOTE: This method should return same spec result if called multiple times.
     * @param planner The {@link Planner} of the translated graph.
    OpFusionCodegenSpec translateToFusionCodegenSpec(Planner planner);

Implementation Plan

  1. In release-1.18, we will prioritize the development of Calc , HashJoin , and HashAgg  operators in batch mode to support OFCG because them are used in high frequency.

  2. Our final goal is to support all operators in batch mode, so we put our effort into the rest operators supporting OFCG in subsequent two or three releases.
  3. The first step is to support OFCG based on BatchExecMultipleInput , and the second step is to support one-input operators for OFCG.

  4. After two to three versions of iteration, we will enable the table.exec.operator-fusion-codegen.enabled option by default.

Compatibility, Deprecation, and Migration Plan


Test Plan



  1. Efficiently Compiling Efficient Query Plans for Modern Hardware
  2. Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask
  3. Photon: A Fast Query Engine for Lakehouse Systems
  4. Support Multiple Input for Blink Planner
  5. PoC:

  • No labels