Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
As main memory grows, query performance is more and more determined by the raw CPU costs of query processing itself, this is due to the query processing techniques based on interpreted execution shows poor performance on modern CPUs due to lack of locality and frequent instruction mis-prediction. Therefore, the industry is also researching how to improve engine performance by increasing operator execution efficiency. In addition, during the process of optimizing Flink's performance for TPC-DS queries, we found that a significant amount of CPU time was spent on virtual function calls, framework collector calls, and invalid calculations, which can be optimized to improve the overall engine performance. To address these issues, we found that operator execution efficiency can be optimized in the following ways:
- Reduce virtual function calls
- Keep data in registers as much as possible to avoid materializing it in memory
- Generate optimal code for queries to reduce instruction cache miss
- Use modern compilers to automatically unroll loops, reducing loop iteration and CPU instruction jump counts
- Lazy computing
After researching these requirements, we found that an operator fusion code generator technology can achieve these goals, which is proposed by Thomas Neumann in the paper. After comparing the performance benefits of vectorization and codegen, as well as considering the high development costs of vectorization, we decided to introduce Operator Fusion Codegen in Flink. Here we refer to it as OFCG. We have finished a PoC and got a 12% gain overall when only supporting only Calc&HashJoin&HashAgg operator. In some queries, we even get more than 30% gain.
OFCG does not involve any changes to the public interface. However, as it is a significant new feature that is currently experimental, it requires iterative development over multiple versions to improve continuously. Therefore, we need to introduce the options `table.exec.operator-fusion-codegen.enabled` to enable this feature. Currently, this option is disabled by default. As the feature gradually improves, we plan to enable it by default after two or three versions.
The following image illustrates how sql text is converted to a Transformation DAG.
RelNode is a relational operator defined in Apache Calcite. A relational expression is represented by a tree of RelNode, which can be optimized by the planner.
ExecNode is an execution node defined in the planner. Currently, each physical RelNode corresponds to an ExecNode, and an ExecNode will be translated to a Tranformation.
The part included in the blue dashed box in the image is relevant to our design, which means that the work of the OFCG mainly focuses on the transformation from
Workflow of OFCG
Drawing on the background information provided and the capabilities of Flink SQL, we have designed the overall workflow of OFCG by combining the produce-consume interface proposed in the paper  with the structure of Flink engine's runtime operators. The flowchart is shown below:
ExecNodeDAG and create a
FusionExecNodefor physical operators that can be fused together.
Traverse each member operator in every
FusionExecNodeto determine whether they support code generation.
If any member operator does not support codegen, generate a
TransformationDAG based on the topological relationship of member
ExecNodeand jump to step 8.
If all member
ExecNodesupport codegen, traverse them based on their topological relationships. For each
ExecNode, generate a corresponding
FusionCodegenSpecobject that describes the information required to perform codegen on that operator. The resulting FusionCodegenSpec objects form a DAG.
FusionCodegenSpecDAG based on topological relationships. First, call the produce-consume interface of the process method to generate code fragments for all member operators that process data, then concatenate them together.
Next, call the produce-consume interface of the endInput to generate code fragments for all member operator's endInput method, then also concatenate them.
Assemble the generated process and endInput-related code fragments into a fused operator.
FusionTransformation, setting the parallelism and managed memory for the fused operator.
When performing operator fusion on ExecNode, there are two operator fusion mechanisms to consider. The first mechanism is proposed in , which supports fusing multiple operators with multiple inputs into one, known as
BatchExecMultipleInput. This is an existing design and we can reuse its capability. The second mechanism is fusing multiple operators with only one input that can be fused together. Regarding this mechanism, we only perform OFCG on the ExecNode layer if all the operators that can be fused together support codegen. Otherwise, we delegate the fusion to the runtime's
OperatorChain mechanism because there is no benefit to fusing non-codegen operators together at the ExecNode layer.
It is important to note that for the mechanism proposed in , if there are operators within BatchExecMultipleInput that do not support codegen, we will abandon OFCG and fallback to the default implementation. There are two main reasons for this decision. Firstly, if we were to perform OFCG on only a subset of
ExecNode that support codegen, this would make the design and implementation of
BatchExecMultipleInput more complex. Secondly, if there are only a few codegen-supported operators within the BatchExecMultipleInput, the benefits of performing OFCG are negligible. Taking into account the complexity and benefits, we have decided not to perform codegen altogether. Instead, we will put more effort into making more operators support OFCG.
Flink OFCG produce-consume
Since Flink is a unified batch-stream compute engine, the design of the runtime operator structure is more complex due to the fact that batch jobs will eventually end. Currently, Flink's runtime operators mainly have two core methods,
processElement method is responsible for the core computation logic of the operator, and is required in both streaming and batch mode. However, in batch mode, data is bounded, so the job will eventually end. Therefore, for batch jobs, an additional
endInput method is needed to perform some cleanup work after all input data has been received, such as the HashAgg operator flushing data only after receiving all the data, and the build side of HashJoin completing the build of the hash table. Considering the needs of both scenarios, the produce-consume interface proposed in  cannot directly meet our case. We need two sets of produce-consume interfaces, respectively, the produce-consume interface corresponding to
processElement , which is used to generate the code related to the processing of data by all operators. The other is the produce-consume interface corresponding to
endInput, which is used to generate the code for the other processing logic needed after the data is processed. The two produce-consume interfaces work together to generate the complete code for the OFCG.
Next, let's take an example to see how the two sets of produce-consume interfaces work with each other to complete OFCG.
The concept of pipeline breaker is proposed and defined in . According to its definition, this pattern can be partitioned into four pipelines, and the yellow output operator is a virtual operator introduced for OFCG, whose role is to bridge the fusion operator with the downstream operator.
Normally, in stream mode, the job is long-running, so each operator only needs to implement the
processElement method, correspondingly in the OFCG case, only the process produce-consume interface needs to be implemented. In batch mode, the job will end and the operator may need to do some cleanup work in the
endInput method, such as flush data to the downstream, so in the OFCG case, both the process and endInput produce-consume interfaces need to be implemented.
In the first step, we recursively call the
processProduce method from the
Output node until the leaf node of the operator tree, starts generating data, and then calls the downstream consume method to generate the code fragment of each operator to process the data. After the recursive call to the HashAgg operator, since it is a blocking operator, it needs to wait until all the data is processed before it starts to send data to downstream, so it does not call the consume method of the downstream operator immediately, so the recursive call to the process method ends.
In the second step, we recursively call the
endInputProduce method from the Output node until the leaf node of the operator tree, and start to call back the endInputConsume method. If the current operator needs to send the data downstream first in the endInput method, it needs to call the
processConsume method of the downstream node first to consume the data. When the data is sent, then
endInputConsume of the downstream operator is called to trigger the clean action of the downstream operator. In this pattern, since HashAgg is a blocking operator, when the
endInputConsume method of this operator is called recursively, it will first call the
processConsume method of the downstream Join operator to consume the data generated by this operator, and then call the
endInputConsume method of the Join operator to trigger other actions. The call is recursive until the end of the
Output node, which is responsible for sending the data to the downstream node.
We complete the OFCG of multiple operators through the cooperation of process and endInput produce-consume interfaces, and the overall flow is relatively brief and clear.
Considering that the Java code generated by OFCG may be too long for Janino to compile successfully, we will try to compile the generated code at the end of OFCG. If the compilation fails, we need to fall back to the original way to avoid the whole query cannot be executed because of the compilation problem.
Regarding the code length of method, we reuse the code split framework which is introduced in Flink to do code spilt to avoid too long method.
Regarding the parallelism of OFCG fusion operator, we take the maximum of all its input operators, which is consistent with . For operators that can only be executed singly parallelism, we do not fuse them into OFCG. Also for Source and Sink operators, since they do not support codegen, we do not fuse them into OFCG nodes, so they work fine even if their parallelism is set separately.
In the stream scenario, users may have the need to set the operator parallelism separately, currently, we do not support the SQL operator to set parallelism separately yet, and overall there is no problem. If we support operator granularity setting parallelism in the future, we cannot support it for this case because we can't sense operator parallelism directly at the ExecNode layer, which is a limitation for us now. If you need to set operator parallelism separately, you need to turn off the OFCG feature. Of course, we will find a way to support setting the parallelism of the operator separately if it is supported in the future.
In batch mode,
NestedLoopJoin start to read probe side input only after the build side input is finished. In addition,
HashAgg operator also needs to use managed memory , so OFCG for batch should deal with both "input selection" for the inputs and "managed memory".
Keep the same way as in  input selection section.
If there are operators that need to use managed memory, the ratio of managed memory for multiple operators included in fused operator is calculated in the same way as in  managed memory section.
In streaming mode, we need to consider the state, checkpoint, barrier after supporting OFCG.
State & Checkpoint
After support OFCG, all operators will be executed in a fused operator, the original operator became a function in it, so there is nothing we need to consider about state initialization. Another thing we need to consider is that a fused operator may fuse some operators of the same kind with the same state names. For this problem, we need to guarantee that all state names are unique during codegen. Otherwise, there would be state sharing.
The state compatibility will be broken as in  state compatibility section.
Keep the same way as in  barrier section.
From the technical implementation point of view, this design is feasible in both stream and batch scenarios, so I consider both stream and batch mode. In the stream scenario, for stateful operator, according to our business experience, basically the bottleneck is on the state access, so the optimization effect of OFCG for the stream will not be particularly obvious, so we will not give priority to support it currently. On the contrary, in the batch scenario, where CPU is the bottleneck, this optimization is gainful. Taking the above into account, this FLIP is currently scoped to batch mode, we don't support streaming operators in the short term.
Internal Interface Changes
This interface maintains the required information an operator needs to support OFCG.
ExecNode wants to support OFCG, this Interface needs to be implemented.
In release-1.18, we will prioritize the development of
HashAggoperators in batch mode to support OFCG because them are used in high frequency.
- Our final goal is to support all operators in batch mode, so we put our effort into the rest operators supporting OFCG in subsequent two or three releases.
The first step is to support OFCG based on
BatchExecMultipleInput, and the second step is to support one-input operators for OFCG.
After two to three versions of iteration, we will enable the table.exec.operator-fusion-codegen.enabled option by default.
Compatibility, Deprecation, and Migration Plan
- Efficiently Compiling Efficient Query Plans for Modern Hardware
- Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask
- Photon: A Fast Query Engine for Lakehouse Systems
- Support Multiple Input for Blink Planner
- PoC: https://github.com/lsyldliu/flink/tree/OFCG