DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
In FLIP-435[1], we proposed the Materialized Table, which aims to simplify data processing pipelines. However, the current design does not support saving job cluster information, limiting the use of materialized tables in environments like Kubernetes or YARN clusters.
This FLIP proposes introducing new interfaces to convert cluster ID information into a persistent map of options. This improvement would enable the persistence of ClusterID for refresh job information in materialized tables, enhancing flexibility and scalability across various cluster environments.
Public Interfaces
We introduce the ClusterInfo interface to store the ClusterID. It contains a single method, getOptions, which returns a Map. This map allows for the persistence of the ClusterID.
* Provides options to retrieve cluster information in both application and session modes.
* <p>
* This interface offers a standardized way to access cluster-specific configuration options,
* which can be used to generate a unique cluster ID.
* </p>
*
*/
public interface ClusterInfo {
/**
* Retrieves the options necessary for cluster information.
* <p>
* These options can be used to generate a cluster ID using the
* {@code org.apache.flink.client.deployment.ClusterClientFactory#getClusterId(Configuration)} method.
* </p>
*
* @return A map containing key-value pairs of cluster information options.
*/
Map<String, String> getOptions();
/**
* Creates a ClusterInfo instance with a single option.
*
* @param clusterIdKey The key for the cluster ID option.
* @param clusterIdValue The value for the cluster ID option.
* @return A new ClusterInfo instance containing the specified option.
*/
static ClusterInfo of(String clusterIdKey, String clusterIdValue) {
return new ClusterInfoImpl(clusterIdKey, clusterIdValue);
}
/**
* Creates an empty ClusterInfo instance.
*
* @return A ClusterInfo instance with no options.
*/
static ClusterInfo empty() {
return Collections::emptyMap;
}
}
Additionally, we need to extend JobClient to provide an method that can return ClusterInfo.
@PublicEvolving
public interface JobClient {
/** Returns the {@link JobID} that uniquely identifies the job this client is scoped to. */
JobID getJobID();
/** Requests the {@link JobStatus} of the associated job. */
CompletableFuture<JobStatus> getJobStatus();
/** Cancels the associated job. */
CompletableFuture<Void> cancel();
default ClusterInfo getClusterInfo() {
return ClusterInfo.empty();
}
}
Proposed Changes
The default implementation of ClusterInfo is ClusterInfoImpl, which stores a key and a value. Since the ClusterID in both YARN and Kubernetes environments is a string, but the keys differ, we use key-value pairs to construct the corresponding options.
/**
* Implementation of the ClusterInfo interface that stores a single cluster ID option.
* <p>
* This class provides a simple way to create a ClusterInfo with one key-value pair
* representing the cluster ID option.
* </p>
*/
public class ClusterInfoImpl implements ClusterInfo {
/** The key for the cluster ID option. */
private final String clusterIdKey;
/** The value for the cluster ID option. */
private final String clusterIdValue;
/**
* Constructs a new ClusterInfoImpl with the specified cluster ID option.
*
* @param clusterIdKey The key for the cluster ID option.
* @param clusterIdValue The value for the cluster ID option.
*/
public ClusterInfoImpl(String clusterIdKey, String clusterIdValue) {
this.clusterIdKey = clusterIdKey;
this.clusterIdValue = clusterIdValue;
}
/**
* Returns a map containing the single cluster ID option.
*
* @return A map with one entry, where the key is the cluster ID option key
* and the value is the cluster ID option value.
*/
@Override
public Map<String, String> getOptions() {
return Collections.singletonMap(clusterIdKey, clusterIdValue);
}
}
There are multiple implementations of JobClient, including: ClusterClientJobClientAdapter, EmbeddedJobClient, MiniClusterJobClient, and WebSubmissionJobClient. We only need to implement and return the corresponding ClusterInfo for ClusterClientJobClientAdapter.
public class ClusterClientJobClientAdapter<ClusterID>
implements JobClient, CoordinationRequestGateway {
private final ClusterClientProvider<ClusterID> clusterClientProvider;
private final JobID jobID;
private final ClassLoader classLoader;
private final ClusterInfo clusterInfo;
public ClusterClientJobClientAdapter(
final ClusterClientProvider<ClusterID> clusterClientProvider,
final JobID jobID,
final ClassLoader classLoader,
final ClusterInfo clusterInfo) {
this.jobID = checkNotNull(jobID);
this.clusterClientProvider = checkNotNull(clusterClientProvider);
this.classLoader = classLoader;
this.clusterInfo = clusterInfo;
}
@Override
public ClusterInfo getClusterInfo() {
return clusterInfo;
}
...
}
To build ClusterInfo when constructing ClusterClientJobClientAdapter, we need to modify ClusterClientFactory to convert ClusterID into ClusterInfo.
public class KubernetesClusterClientFactory
extends AbstractContainerizedClusterClientFactory<String> {
@Override
public ClusterInfo getClusterInfo(String clusterID) {
return ClusterInfo.of(KubernetesConfigOptions.CLUSTER_ID.key(), clusterID);
}
}
public class YarnClusterClientFactory
extends AbstractContainerizedClusterClientFactory<ApplicationId> {
@Override
public ClusterInfo getClusterInfo(ApplicationId clusterID) {
return ClusterInfo.of(YarnConfigOptions.APPLICATION_ID.key(), clusterID.toString());
}
}
Constraints
This change currently supports only Kubernetes session mode and YARN session mode. Support for Kubernetes application mode and YARN application mode will be added in a future update, once FLIP-316 [2] is completed.
Test Plan
Unit Tests: Add unit tests to verify cluster info persistence in materialized table.
End-to-End Testing: Validate that materialized tables can use persisted cluster information in both YARN and Kubernetes environments by deploying and running test jobs.
POC: https://github.com/apache/flink/compare/master...hackergin:flink:yarn-cluster-poc
Discarded
As in FLIP-480 we directly use the toString method to convert ClusterID to String, so we no longer need this FLIP.