Status

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/5yo9mkxw6bk1m3klgy5bl2p8y7s3nq0r
Vote thread
JIRA

FLINK-37754 - Getting issue details... STATUS

Release


Motivation
In the world of big data, Flink session cluster is mostly used for ETL, OLTP, ML and other scenarios, but rarely used for OLAP.

In my company, due to business needs, we rely on the Flink session cluster for querying and analysis in OLAP scenarios. But after conducting some experiments, we found two phenomenons:
1. The query or analysis latency is very high.
2. When the QPS of the query increases, the latency of the query becomes higher.

So to perform well in OLAP scenarios, it is necessary to optimize the latency of individual queries and the maximum query QPS that Flink session cluster can withstand. This FLIP only optimizes the latency of individual queries, and optimizing query QPS of Flink session cluster will be discussed in the next FLIP.

In order to optimize the query latency, we first adding some logs and metrics in the key code of IO, we found that the main reason of high latency are:

  1. The poll time of client fetch job's result is high in OLAP scenario, it is hard code(100 ms), don't support config.
  2. There are some unnecessary IO in Batch job.
  3. Repeated upload and download of user JARs packages.

For business considerations, we have made some optimizations to reduce OLAP query latency. And we received very good feedback from the business side. So we want to give back to the Flink community and will continuously feedback other optimizations to the Flink community as the business iterates.

 

Proposed Changes


Increase the observability

We first add some logs and metrics to increase the observability of the Flink session cluster, so that we can know which stage takes more time, then we can short-circuit some unnecessary IO,for example:

  1.  In client side, we add logs.
  2.  In JM and TM side,  we add metrics.


Optimize frequency of fetcher job result

In the client side, we can optimize the polling time of fetcher job result.

The running time of flink batch job is not usually too long in OLAP scenarios, the retryMillis in CollectResultFetcher is 100 ms(hard code) and it is not configurable, so we can add a config so that allows users to customize retryMillis based on query complexity. For example, if the query is too easy, we can config the retryMillis to 10 ms or less.


Short-circuit some unnecessary IO In OLAP

1. If config enabled, we can short circuit the Checkpoint code logic in DefaultExecutionGraphBuilder#buildGraph、SchedulerUtils#createCompletedCheckpointStoreIfCheckpointingIsEnabled、SchedulerUtils#createCheckpointIDCounterIfCheckpointingIsEnabled.
2. Also we can short circuit the stateBacked code logic in StreamTask#StreamTask.
3. In OLAP scenarios the job need not recovery when failover, it can fail directly. So we can force the use of StandaloneExecutionPlanStore to avoid putting the JobGraph into Zookeeper, thereby speeding up queries.
4. Similarly, we can force the use of EmbeddedJobResultStore to avoid uploading job results to OSS.
5. We can optimize the process of job submission by removing the disk IO of JobGraph and the FS#open when JM deserializes JobGraph. Specifically, the client no longer needs to write the JobGraph to local disk then encapsulate it into the HTTP request, and there is no need to write the JobGraph to local disk in HTTP request on the JM side. But instead, the JobGraph is directly serialized into byteArray on the client side, then ByteArray is transparently passed to JM via HTTP, then directly deserialized into a JobGraph. Overall, it saves the cost of two disk IO and a FS#open.


Optimize the upload and download of user JARs

The upload and download of user JARs, as well as the initialization of user classloader, greatly increase the end-to-end time consumption of OLAP scenarios, which will greatly affect the user experience. There are two ways to solve this problem, the end-to-end time required for OLAP results will be significantly improved.

The first method supports sharing JARs and user classloaders across jobs in the Flink session cluster.

The second method is to load some common JARs into the JM && TM's classpath through external process (such as k8s pod init container) before the Flink session cluster startup.

The two methods complement each other, I suggest we can both implement them.

  1. By supporting JARs and user classloader sharing across jobs.

    (1) Client sent HTTP request to JM to get shared JARs in session cluster.
    (2) SharedJarsHandler in rest call dispatcher gateway to list shared JARs. The SharedJarManager manages all shared JARs in the cluster uniformly and responds to REST requests for adding, deleting, modifying, and querying.
    (3) JM response shared JARs to client by HTTP.
    (4) Find the shardable JARs in the JobGraph that have not been sharded.
    (5) Submit job.
    (6) Add sharable JARs to BlobServer. These JARs are the ones that this job needs to share, but they are not yet in the cluster shared JARs list.
    (7) Upload non-shared JARs to BlobServer.
    (8) When task init, try get user class loader in cache(user classloaders that can be shared across jobs. A user classloader can be shared if it only uses shared JARs.), if not hit cache, then create a new user classloader.

  2. When the Flink session cluster starts, we can download the JARs to JobManager and TaskManager through the k8s pod init container. When job submit, JobManager will not upload the user JARs to blob server and the TaskManagers will no longer need to download the JARs from blob server. 

Support session mini cluster for local search

Some easy sql can search result in client, no need get result from session cluster, so we must support session mini cluster.

 


Public Interfaces

Add Some New Class

  1. We need add a SharedJarManager which manages all shared JARs in the cluster uniformly and responds to REST requests for adding, deleting, modifying, and querying.
public class SharedJarManager {

    private final Map<String, SharedJar> sharedJars;

    private final SupplierWithException<BlobClient, IOException> blobClientSupplier;

    public SharedJarManager(SupplierWithException<BlobClient, IOException> blobClientSupplier) {
        this.sharedJars = new LinkedHashMap<>();
        this.blobClientSupplier = checkNotNull(blobClientSupplier);
    }

    public List<SharedJar> addJars(List<JarDesc> jars, String jobName) {
        return addJars(jars, jobName, null);
    }

    public List<SharedJar> addJars(
            List<JarDesc> jars, String jobName, JobManagerMetricGroup jobManagerMetricGroup) {
        // todo
    }

    private SharedJar addJar(JarDesc jarDesc, BlobClient blobClient) throws IOException {
        // todo
    }

    public List<SharedJar> listJars() {
        // todo
    }
}


2. We also need add a SharedJarsHandler in rest.

public final class SharedJarsHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, SharedJarsInfo, EmptyMessageParameters> {    
    public SharedJarsHandler(GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Time timeout, Map<String, String> headers) {
          super(leaderRetriever, timeout, headers, SharedJarsHeaders.getInstance());
    }

    @Override
    protected CompletableFuture<SharedJarsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            // todo
    }
}


Add some config

一、Add follow configs in CollectDynamicSinkOptions

public static final ConfigOption<Integer> RETRY_SLEEP_INTERVAL_IN_MILLS =
ConfigOptions.key("retry.sleep.interval.in.mills")
.intType()
.defaultValue(100)
.withDescription("Fetcher result retry sleep interval.");


二、Add follow configs in CheckpointingOptions. 

public static final ConfigOption<Boolean> SHORT_CUT_CHECKPOINT_ON_MASTER =
ConfigOptions.key("short.cut.checkpoint.on.master")
.booleanType()
.defaultValue(false)
.withDescription("Whether short cut checkpoint on master.");

public static final ConfigOption<Boolean> SHORT_CUT_CHECKPOINT_ON_SLAVE =
ConfigOptions.key("short.cut.checkpoint.on.slave")
.booleanType()
.defaultValue(false)
.withDescription("Whether short cut checkpoint on slave.");


三、Add follow configs in ClusterOptions
@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<Boolean> ENABLE_USER_JAR_SHARING =
ConfigOptions.key("cluster.user-jar-sharing.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
Description.builder()
.text(
"Whether to enable user jar sharing. If true, jars that matches %s will be shared across jobs.",
code(PipelineOptions.SHARABLE_JAR_NAME_PATTERNS.key()))
.build());

@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<Boolean> ENABLE_USER_CLASSLOADER_SHARING =
ConfigOptions.key("cluster.user-classloader-sharing.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to enable user classloader sharing. If true, user classloader that "
+ "only rely on system and shared jars will be shared to jobs relying on the same jars.");

四、Add follow configs in PipelineOptions.
/** A list of jar name patterns(regex). Matched jars can be shared across jobs. */
public static final ConfigOption<List<String>> SHARABLE_JAR_NAME_PATTERNS =
key("pipeline.sharable-jar-name-patterns")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"A semicolon-separated list of the jar name patterns(regex). Matched jars can be shared across jobs.");
 
public static final ConfigOption<Boolean> DISABLE_UPLOAD_USER_JARS =
key("pipeline.upload-user-jars-via-blobs.disabled")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, Flink will not upload the user jars to blob server and the workers will not"
+ " download the jars from blob server. Note that it can be set to true only"
+ " if some external process will take over the responsibility to upload the user jars,"
+ " otherwise the job may fail due to user jars not found. At present, it only takes "
+ "effect in datastream application mode, so the configuration will be forced to false in other modes.");


五、Add follow configs in ClientOptions

public static final ConfigOption<Boolean> CLIENT_JOG_GRAPH_LOCAL_FILE_ENABLED =
key("client.job.graph.local.file.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, client and server will use local file to transmit job graph.");


Compatibility, Deprecation, and Migration Plan

We will keep compatibility.


Test Plan

We will add UT && IT.

Rejected Alternatives

  • No labels