DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
| Discussion thread | https://lists.apache.org/thread/9m8snbov3kn8bcz9npchgsv8wn0k33go |
|---|---|
| Vote thread | https://lists.apache.org/thread/b88cf8solj9m9cdyp1gqlg9nbspt2p8k |
| JIRA | FLINK-37504 - Getting issue details... STATUS |
| Release |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink currently lacks the ability to reload TLS certificates when the underlying truststore and keystore are updated.
Introducing a method to reload TLS certificates will enhance Flink's security by supporting certificates with short validity periods.
This feature reduces the risk of using compromised certificates. Presently, the only workaround for using such certificates in Flink is to restart the cluster before the TLS certificates expire.
This FLIP aims to provide a solution for reloading TLS certificates in place, eliminating the need for system restarts.
Public Interfaces
Add an configuration property that can be used by brokers, consumer, producers:
security.ssl.reload: When set to 'true', the keystore and truststore directories are monitored for changes and automatically reload the SSL Engine Factory with the updated keystore/truststore files. The files must keep their original names and paths. Defaults to 'false'.
Proposed Changes
Apache Flink relies on TLS encryption for secure communication across the following four components:
Pekko: Uses CustomSSLEngineProvider for TLS support.
Netty Internal: Uses SSLUtil.createInternalNettySSLContext
Netty Rest: Uses SSLUtil.createRestNettySSLContext.
Blob Server: Relies on SSLUtil.createSSLServerSocketFactory, which itself depends on SSLUtil.createInternalNettySSLContext
As implemented in https://issues.apache.org/jira/browse/FLINK-28272 to handle TLS certificate reloading in the Apache Flink operator webhook, the changes will introduce a new FileSytemWatchService.
This service leverages Java's WatchService API to track file modifications.
LocalFSWatchService will be started as a singleton. Each service that depends on TLS certificates will register a path to watch with a callback that will be invoked every time a file change is detected.
A LocalFSWatchServiceListener interface will be created to expose different events the callback will have to manage.
interface LocalFSWatchServiceListener {
void onWatchStarted(Path realDirectoryPath) {}
void onFileOrDirectoryCreated(Path relativePath) {}
void onFileOrDirectoryDeleted(Path relativePath) {}
void onFileOrDirectoryModified(Path relativePath) {}
}
public class FileSystemCertificateWatchService extends Thread {
public void run() {
try (WatchService watcher = FileSystems.getDefault().newWatchService()) {
for (String directoryPath : directoryPaths) {
Path realDirectoryPath = Paths.get(directoryPath).toRealPath();
realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
onWatchStarted(realDirectoryPath);
}
while (true) {
for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
// Manage watch events
}
}
}
}
// Register callbask and list of paths
public registerWatchedPath(LocalFSWatchServiceListener callback, Path[] pathsToWatch)
The reload mechanism will only be enabled if a specific flink configuration is enabled. It will require adding security.ssl.reload configuration.
Callback called by LocalFSWatchService when a change happens must be executed quickly. In order to ensure this the callback will only set an atomic dirty flag.
On Pekko, the CustomSSLEngineProvider will be updated to fully implement the SSLEngineProvider interface.
It will rely on a SSLContextLoader, which will be responsible for creating the SSLContext and reloading it when called by the LocalFSWatchService.
public class CustomSSLEngineProvider implements SSLEngineProvider {
private final SSLContextLoader sslContextLoader;
public CustomSSLEngineProvider(ActorSystem system) {
...
sslContextLoader = new SSLContextLoader(sslTrustStore, sslProtocol, securityConfig);
FileSystemCertificateWatchService fileSystemWatchService = new FileSystemCertificateWatchService(...);
fileSystemWatchService.launch();
}
...
private SSLEngine createSSLEngine(TLSRole role) {
return createSSLEngine(sslContextLoader.creaeSSLEngine(), role);
}
private SSLEngine createSSLEngine(SSLEngine engine, TLSRole role) {
...
return engine;
}
}
public class SSLContextLoader implements LocalFSWatchServiceListener {
private volatile SSLContext sslContext;
public SSLContextLoader(String sslTrustStore, String sslProtocol, Config securityConfig) {
...
loadSSLContext();
}
// Set dirty atomic bool to false and regenerate SSLContext
void synchronized loadSSLContext()
// Check dirty atomic bool before creating the engine
// reload SSLContext if needed
public SSLEngine createSSLEngine()
void onWatchStarted(Path realDirectoryPath) {}
void onFileOrDirectoryCreated(Path relativePath) {}
void onFileOrDirectoryDeleted(Path relativePath) {}
// Set dirty atomic bool to true
void onFileOrDirectoryModified(Path relativePath) {}
}
`
New connections will request the creation of a new SSLEngine, which will be based on the latest version of the SSLContext.
For Netty (both REST and internal), a ReloadableSslContext will be introduced.
It will be in charge of loading the SslContext. As with pekko when changes happen a dirty atomic bool will be set and reload of certificate will be taken in account when calling SSLContext actions (newEngine…).
A ReloadableJdkSslContext will also be added, extending ReloadableSslContext, to manage the reload of certificates for the SSLContext used by the BlobServer (which relies on Java's ServerSocket).
The createRestNettySSLContext and createInternalNettySSLContext methods of SSLUtils will be modified to return a ReloadableSslContext and will register a path to watch with a callback on the LocalFSWatchService singleton.
public class SSLUtils {
...
@Nullable
private static SslContext createInternalNettySSLContext(
Configuration config,
boolean clientMode,
SslProvider provider)
throws Exception {
...
ReloadableJdkSslContext reloadableJdkSslContext =
new ReloadableJdkSslContext(config, clientMode, provider);
if (config.get(SecurityOptions.SSL_RELOAD)) {
// register path and callback to LocalFSWatchService
}
return reloadableJdkSslContext;
}
...
@Nullable
public static SslContext createRestNettySSLContext(
Configuration config, boolean clientMode, ClientAuth clientAuth, SslProvider provider)
throws Exception {
...
ReloadableSslContext reloadableSslContext =
new ReloadableSslContext(config, clientMode, clientAuth, provider);
if (config.get(SecurityOptions.SSL_RELOAD)) {
// register path and callback to LocalFSWatchService
}
return reloadableSslContext;
}
}
public class ReloadableSslContext extends SslContext implements LocalFSWatchServiceListener {
protected volatile SslContext sslContext;
public ReloadableSslContext(
Configuration config, boolean clientMode, ClientAuth clientAuth, SslProvider provider)
throws Exception {
...
loadContext();
}
void onWatchStarted(Path realDirectoryPath) {}
void onFileOrDirectoryCreated(Path relativePath) {}
void onFileOrDirectoryDeleted(Path relativePath) {}
// Set dirty atomic bool to true
void onFileOrDirectoryModified(Path relativePath) {}
// load ssl context here
protected synchronized void loadContext() throws Exception
}
public class ReloadableJdkSslContext extends ReloadableSslContext {
public ReloadableJdkSslContext(Configuration config, boolean clientMode, SslProvider provider)
throws Exception {
super(config, clientMode, ClientAuth.NONE, provider);
}
// load ssl context here
@Override
protected synchronized void loadContext() throws Exception
}
On the BlobServer, the current implementation uses Java's ServerSocket/ServerSocketFactory, which does not provide a way to reload the SSLContext. The approach will be to recreate the socket on the same port used previously each time a file change is detected.
A specific BlobServerSocket will be introduced to manage requests for ServerSocketFactory from SSLUtils.
BlobServer will register path to watch and BlobServerSocket callback on the LocalFSWatchService.
BlobServerSocket will be in charge of reloading the context when calling getServerSocket if the dirty atomic bool indicate reload is needed.
public class BlobServer extends Thread
implements BlobService,
BlobWriter,
PermanentBlobService,
TransientBlobService,
LocallyCleanableResource,
GloballyCleanableResource {
public BlobServer(Configuration config, Reference<File> storageDir, BlobStore blobStore)
throws IOException {
...
blobServerSocket = new BlobServerSocket(config, backlog, maxConnections);
...
if (config.get(SecurityOptions.SSL_RELOAD) && SecurityOptions.isInternalSSLEnabled(config)
&& config.get(BlobServerOptions.SSL_ENABLED)) {
// Register path and callback
}
}
public class BlobServerSocket implements LocalFSWatchServiceListener {
private ServerSocket serverSocket;
private boolean firstCreation = true;
public BlobServerSocket(Configuration config, int backlog, int maxConnections)
throws IOException {
serverPortRange = config.get(BlobServerOptions.PORT);
createSocket();
}
// Check dirty atomic bool before creating the engine
// recreate the socket if needed
// and provide the server socket
public ServerSocket getServerSocket()
void onWatchStarted(Path realDirectoryPath) {}
void onFileOrDirectoryCreated(Path relativePath) {}
void onFileOrDirectoryDeleted(Path relativePath) {}
// Set dirty atomic bool to true
void onFileOrDirectoryModified(Path relativePath) {}
// Will manage creation of socket
private synchronized void createSocket() throws IOException
}
Compatibility, Deprecation, and Migration Plan
No impact on existing users.
Test Plan
These changes will be covered with unit and integration tests.
Integrations tests will have to ensure no regressions are observed when quickly modifying multiple times watched files.
Rejected Alternatives
None