Background

In relational algebra, Join are used to co-group two datasets and combine the data based on specific conditions. For stream computing systems, the data of the two streams is cached (usually through State) when the Join operation is performed. When data from either stream arrives, it can be matched with data from another stream. Therefore, Join has been widely used in multi-stream aggregate scenarios.

Note:

  • This FLIP only focuses on the implementation of regular Join. Non-Regular Join (such as Interval Join, Lookup Join), and Window Join will be introduced in subsequent FLIP.
  • This FLIP not support broadcast join, because the data on the broadcast side cannot be stored using keyed state; it can only be stored using broadcast state. But currently the broadcast state only supports map types, and the value in the map needs to be set to a list type. This results in potential serialization overhead every time data is written, as the list needs to be serialized. We think the implementation has poor performance. In the future, we will look for ways to support broadcast join.

Example

Before getting into the details, let's take a look at an overall example of how to use Join to do two-stream data association on the DataStream V2 API.

Example
/**
 * This example illustrates an e-commerce scenario where order information is enriched by
 * integrating the order details with express information.
 *
 * <p>The application processes two types of input data: the first is the order information {@link
 * Order}, which includes attributes such as orderId, productId, and others. The second is the
 * express information {@link ExpressEvent}, which contains orderId, expressId, and additional
 * details.
 *
 * <p>The output generated by the application is a more comprehensive order summary {@link
 * EnrichedOrder}, which combines both order and express information.
 *
 * <p>To enrich the orders, we partition the {@link Order} and {@link ExpressEvent} data by orderId.
 * We then join the records from {@link Order} and {@link ExpressEvent} that share the same orderId,
 * then generate and output the {@link EnrichedOrder}.
 */
class EnrichOrderExample implements Serializable {

    /** The {@link Order} contains essential order information. */
    public static class Order {
        private long orderId;
        private long productId;
        private long userId;
        private long orderTime;
    }

    /** The {@link ExpressEvent} contains information about the express service. */
    public static class ExpressEvent {
        private long orderId;
        private long expressId;
        private long arrivalTimestamp;
    }

    /** The {@link EnrichedOrder} includes both order details and express information. */
    public static class EnrichedOrder {
        private long orderId;
        private long productId;
        private long userId;
        private long orderTime;
        private long expressId;
        private long expressArrivalTimestamp;
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getInstance();

        // create Order stream source
        NonKeyedPartitionStream<Order> orderStream = createSourceForOrder();

        // create ExpressEvent stream source
        NonKeyedPartitionStream<ExpressEvent> expressEventStream = 
                createSourceForExpressEvent();

        NonKeyedPartitionStream<EnrichedOrder> enrichedOrderStream =
                BuiltinFuncs.join(
                        orderStream,
                        Order::getOrderId,
                        expressEventStream,
                        ExpressEvent::getOrderId,
                        (order, expressEvent, output, ctx) -> {
                            // generate EnrichedOrder and emit it to output.
                            output.collect(
                                    new EnrichedOrder(
                                            order.getOrderId(),
                                            order.getProductId(),
                                            order.getUserId(),
                                            order.getOrderTime(),
                                            expressEvent.getExpressId(),
                                            expressEvent.getArrivalTimestamp()
                                    )
                            );
                        },
                        JoinType.INNER);

        // print result
        enrichedOrderStream.toSink(new WrappedSink<>(new PrintSink<>()));
        env.execute("EnrichOrderExample");
    }
}

In this example, we connect the Order and ExpressEvent streams via the BuiltinFuncs.join  tool method. This method requires four types of parameters:

  1. The left and right streams to be Joined.

  2. The KeySelector that sepecifies the join key

  3. Processing logic for the joined data — JoinFunction

  4. The type of Join

Since we want to connect Order and ExpressEvent with the same orderId in our example, we choose orderId as the join key.

After joining the data with the same orderId, the JoinFunction is used to clarify how to process the joined records. The calculation logic here is to combine Order and ExpressEvent to generate an EnrichedOrder with order and express information.

Finally, because we only want to count orders that have both order information and express information, we choose INNER JOIN as join type.

Proposed Changes

As illustrated in the example, there are two key points to completing a Join operation:

  1. Redistributing the two data streams based on the join key.

  2. Executing the Join computation logic for the data that has the same join key.

For the first key point, users need to define the KeySelector for the corresponding join key, this step is straightforward and will not be repeated here. The second key point requires users to define the computation logic, represented in this FLIP through JoinFunction and JoinType.

We will first provide a detailed definitions of JoinFunction and JoinType, and then we will discuss how users can utilize them to perform the Join computation.

JoinFunction

After redistributing by the join key, we can retrieve data that shares the same join key.

At this point, the user needs to specify how to process this data, and we refer to this computational logic as the JoinFunction. This interface defines the operations to be performed on the data with same join key. Users are required to implement their own JoinFunction and write the corresponding computational logic within this interface.

/**
 * A functional interface that defines a join operation between two input records of types {@code IN1} and {@code IN2}.
 *
 * <p>This interface is used to process a pair of records from two different data streams and produce an output record
 * of type {@code OUT}. Implementations of this interface can be used to define custom join logic in stream processing
 * frameworks.</p>
 *
 * @param <IN1> the type of the first input record
 * @param <IN2> the type of the second input record
 * @param <OUT> the type of the output record
 */
@FunctionalInterface
@Experimental
public interface JoinFunction<IN1, IN2, OUT> extends Function {
    void processRecord(IN1 leftRecord, IN2 rightRecord, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;
}

JoinType

In addition, users need to explicitly choose which join algorithm to use, so we define JoinType  . Outer join requires knowing that "there's no more data for a certain join key", which is not feasible for non-window join over unbounded streams. Therefore, in this FLIP we only introduce INNER join type, and leave more join types to be introduced in future FLIPs as needed.

/** The type/algorithm of join operation. */
@Experimental
public enum JoinType {
    INNER,
}

APIs for performing Join

We provide a tool for users to convert JoinFunction   and JoinType   to ProcessFunction   that can be processed by DataStream. The tool and related examples are shown below.

/** Built-in functions for all extension of datastream v2. */
@Experimental
public final class BuiltinFuncs {
    /**
     * Wrap the JoinFunction and JoinType within a ProcessFunction to perform the Join 
     * operation. Note that the wrapped process function should only be used with KeyedStream.
    */ 
    public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join(
            JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) {
        ...
    }
}
KeyedPartitionStream keyedStream1 = stream1.keyBy(new CustomJoinKeySelector1());
KeyedPartitionStream keyedStream2 = stream2.keyBy(new CustomJoinKeySelector2());
NonKeyedPartitionStream joinedStream = keyedStream1.connectAndProcess(
  keyedstream2, 
  BuiltinFuncs.join(
    new CustomJoinFunction(), 
    JoinType.INNER
  )
)

Note that the ProcessFunction  generated by this tool can only be used for join processing of two KeyedStreams, so the user must manually KeyBy the input streams.


To make it easier for users, we provide some APIs based on the above tool, which have the same functionality but are simpler to use.

In the following APIs, users can have two choices:

1. Passing in two KeyedStream and JoinFunction  as parameters, along with an optional JoinType  (defaults to INNER), we return the joined DataStream.

2. Passing in two NonKeyedStreams with corresponding join key KeySelector  and JoinFunction  as parameters, along with an optional JoinType  (defaults to INNER), we will internally repartition the two NonKeyedStream by join key and return the joined DataStream.

/** Built-in functions for all extension of datastream v2. */
@Experimental
public final class BuiltinFuncs {

    ...
  
    /** Inner join two {@link KeyedPartitionStream}. */
    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
            KeyedPartitionStream<KEY, T1> leftStream,
            KeyedPartitionStream<KEY, T2> rightStream,
            JoinFunction<T1, T2, OUT> joinFunction)

    /** Join two {@link KeyedPartitionStream} with the type of {@link JoinType}. */
    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
            KeyedPartitionStream<KEY, T1> leftStream,
            KeyedPartitionStream<KEY, T2> rightStream,
            JoinFunction<T1, T2, OUT> joinFunction,
            JoinType joinType)

    /**
     * Inner join two {@link NonKeyedPartitionStream}. The two streams will be 
     * redistributed by {@link KeySelector} respectively.
     */
    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
            NonKeyedPartitionStream<T1> leftStream,
            KeySelector<T1, KEY> leftKeySelector,
            NonKeyedPartitionStream<T2> rightStream,
            KeySelector<T2, KEY> rightKeySelector,
            JoinFunction<T1, T2, OUT> joinFunction)
  
   /**
     * Join two {@link NonKeyedPartitionStream} with the type of {@link JoinType}. The two streams will be 
     * redistributed by {@link KeySelector} respectively.
     */
    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
            NonKeyedPartitionStream<T1> leftStream,
            KeySelector<T1, KEY> leftKeySelector,
            NonKeyedPartitionStream<T2> rightStream,
            KeySelector<T2, KEY> rightKeySelector,
            JoinFunction<T1, T2, OUT> joinFunction,
            JoinType joinType)
}

The following are examples of using KeyedStream and NonKeyedStream as parameters:

// using NonKeyedStream
NonKeyedPartitionStream joinedStream = BuiltinFuncs.join(
  stream1, 
  new CustomJoinKeySelector1(), 
  stream2, 
  new CustomJoinKeySelector2(),
  new CustomJoinFunction()
);

// using KeyedStream
NonKeyedPartitionStream joinedStream = BuiltinFuncs.join(
  keyedStream1, 
  keyedStream2, 
  new CustomJoinFunction()
);

Compatibility, Deprecation, and Migration Plan

  1. The contents described in this FLIP is just provide an new extension for DataStream V2, no compatibility issues will be introduced.
  2. The proposed public interfaces in this FLIP will be annotated by @Experimental first, and should be changed to @PublicEvolving/@Public along with other Datastream V2 APIs.

Test Plan

UT & IT