Status

Target ReleaseFLINK - 2.3
Current StateAccepted


Goal

Offer native S3 filesystem (flink-s3-fs-native) for Apache Flink, which directly uses AWS SDK V2 and is completely independent of Hadoop and Presto libraries. The aim is to simplify Flink's S3 integration into a single, unified solution.

POC : https://github.com/apache/flink/pull/27187

Benchmarking results: TL;DR: observed performance improvement. More details : Benchmarking Native S3 FileSystem (flink-s3-fs-native) vs Presto S3 (flink-s3-fs-presto)

Motivation

The current ecosystem for S3 support in Apache Flink is bifurcated between two primary implementations: flink-s3-fs-hadoop and flink-s3-fs-presto. While these connectors have facilitated S3 integration for several years, the "dual-connector" approach has reached its technical limits. The Hadoop-based connector is fundamentally an adaptation of the Hadoop S3A filesystem, which was designed for the block-based paradigms of HDFS. This adaptation brings with it a massive transitive dependency tree, often exceeding 30MB, which includes critical libraries such as Guava, Protobuf, Netty, and Jackson. The inclusion of these libraries within the Flink plugin frequently leads to runtime classpath conflicts, as user applications or other Flink components may require different, incompatible versions of the same libraries.


The Presto-based connector was introduced to offer a lighter-weight alternative, particularly optimised for the high-frequency I/O patterns of Flink checkpointing. However, this optimisation came at the cost of functional completeness. Specifically, the Presto connector lacks a RecoverableWriter implementation, which is a prerequisite for Flink’s exactly-once sink semantics. Consequently, users are often forced into a confusing architectural pattern where they use the Presto connector for checkpoint storage but are required to use the Hadoop connector for streaming sinks to S3. This fragmentation complicates deployment, as users must manage multiple configurations and potential dependency collisions for a single job.   

Furthermore, both legacy connectors are coupled to external release cycles—Hadoop and Trino/Presto, respectively—which delays Flink’s adoption of modern AWS features. The shift toward a native implementation using the AWS SDK v2 allows Flink to leverage modern cloud-native capabilities such as regional endpoints, IAM Roles for Service Accounts (IRSA), and S3 Express One Zone directly. Performance is also a critical driver; Hadoop’s blocking I/O model is increasingly becoming a bottleneck in high-throughput streaming environments where non-blocking, asynchronous I/O is necessary to maximise TaskManager resource utilisation. 

Current State: A Tale of Two Connectors

Currently, Apache Flink provides two primary mechanisms for interacting with S3, both of which are adapters wrapping external projects. This "dual-connector" approach has introduced a fragmented ecosystem with distinct operational trade-offs.

This connector wraps the functionality S3AFileSystem provided by the Apache Hadoop project. It is currently the most feature-complete implementation, supporting both the FileSystem interface and the RecoverableWriter needed functionality for streaming sinks. However, it suffers from severe architectural drawbacks:

  • The flink-s3-fs-hadoop artifact is notoriously large because it bundles the entire Hadoop common and aws modules, along with their transitive dependencies. These dependencies include ubiquitously used libraries such as Guava, Protobuf, Netty, and Apache Commons HttpComponents. Even with aggressive relocation (shading) during the build process, resource files and certain reflective loading mechanisms often escape shading, leading to runtime classpath conflicts when user applications rely on different versions of these same libraries. This phenomenon significantly degrades the developer experience, forcing users to wrestle with ClassNotFoundException or NoSuchMethodError in production environments.

  • Flink’s ability to adopt modern AWS features (such as IAM Roles for Service Accounts, S3 Express One Zone, or regional endpoints) is strictly coupled to the Hadoop release cycle. Flink developers cannot simply upgrade the AWS SDK version; they must wait for a Hadoop release that supports the newer SDK, then integrate that Hadoop version into Flink.

  • The Hadoop FileSystem abstraction was originally designed for block-based storage with strong consistency and mutable files capabilities that object stores lack. The translation layer introduces impedance mismatches, and the underlying I/O operations are typically blocking, which is suboptimal for Flink’s asynchronous checkpointing model.

Introduced as a lightweight alternative, this connector wraps the file system implementation from the Presto (now Trino) project.

  • While it alleviates some dependency issues, the Presto connector fundamentally lacks a robust implementation of the RecoverableWriter interface. This interface is non-negotiable for Flink’s StreamingFileSink (and the newer FileSink), which requires the ability to persist "in-progress" files across checkpoints and recover them seamlessly after failures.

  • The lack of feature parity forces users into complex configurations where they might use the Presto connector for state backend checkpoints (due to its speed) but are forced to use the Hadoop connector for data sinks (due to the RecoverableWriter). This bifurcated setup increases the surface area for bugs and configuration errors.

3 The Solution: A Native Implementation

To address these systemic issues, this FLIP proposes the creation of flink-s3-fs-native, a clean-slate implementation built directly on the AWS SDK for Java v2. This aligns Flink with broader industry trends—exemplified by projects like Trino, which have successfully migrated to native connectors to improve performance and maintainability.

The native implementation aims to:

  1. By removing the Hadoop dependency, the connector becomes a self-contained module depending only on the modular AWS SDK v2.

  2. Provide a single, high-performance connector that supports both state access (FileSystem) and transactional sinks (RecoverableWriter), simplifying the user experience.

  3. Leverage the non-blocking I/O capabilities of Netty and the AWS Common Runtime (CRT) to maximize throughput

Public Interface

Proposed Artifact

It introduces a new Maven module which produces the plugin artifact.

ArtifactDescription
flink-s3-fs-nativeThe standalone module containing the native implementation. Unlike previous connectors, this does not depend on hadoop-common or hadoop-aws.



Deployment Location

To activate the public interface, the artifact must be placed in the Flink plugins directory. The standard deployment pattern is:

/opt/flink/plugins/s3-native/flink-s3-fs-native-<version>.jar

This placement triggers the PluginManager to load the NativeS3FileSystemFactory via the Java Service Provider Interface.

Defination


1. NativeS3FileSystemFactory


This class implements org.apache.flink.core.fs.FileSystemFactory. It registers the URI scheme. It is instantiated by the PluginManager at startup. It reads the flink-conf.yaml and initializes the shared S3AsyncClient. If both flink-s3-fs-hadoop and flink-s3-fs-native are present and both attempt to register s3://, the behavior is undefined (usually the first one loaded wins). It is recommended to use only one.

public class NativeS3FileSystemFactory implements FileSystemFactory {....
}



2. NativeS3FileSystem

This class extends org.apache.flink.core.fs.FileSystem and implements org.apache.flink.core.fs.EntropyInjectingFileSystem.


/**
 * Native S3 FileSystem implementation using AWS SDK v2.
 *
 * <ul>
 *   <li>Operations in progress will be allowed to complete
 *   <li>New operations started after close will throw {@link IllegalStateException}
 * </ul>
 *
 * <p><b>Permission Considerations:</b> Some operations require specific IAM permissions:
 *
 * <ul>
 *   <li>{@link #getFileStatus}: Returns 403 for non-existent objects if ListBucket permission is
 *       not granted (to prevent object enumeration)
 *   <li>{@link #listStatus}: Requires ListBucket permission
 *   <li>{@link #delete}: With only DeleteObject permission, deleting non-existent objects may
 *       return errors
 * </ul>
 */
public class NativeS3FileSystem extends FileSystem
        implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync {



3. NativeS3RecoverableWriter

This class implements org.apache.flink.core.fs.RecoverableWriter. It exposes the state objects used for checkpointing:

  • CommitRecoverable: A serializable object containing the list of S3 ETags (part identifiers) for uploaded parts and the raw bytes of the "tail" buffer (data < 5MB).

  • Recoverable: An object representing a file currently being written, used to resume writing after a failure.


/** Recoverable writer for S3 using multipart uploads for exactly-once semantics. */
@PublicEvolving
public class NativeS3RecoverableWriter implements RecoverableWriter, AutoCloseable {
....

}


Configuration

Option KeyDefaultTypeDescription
s3.region(auto)StringThe AWS region (e.g., us-east-1). If unset, the SDK attempts to resolve it via the default provider chain (Env Vars, EC2 Metadata, ~/.aws/config).
s3.endpoint(none)StringOverrides the S3 service endpoint. Critical for S3-compatible stores (MinIO, Ceph) or testing with LocalStack.
s3.path-style-accessfalseBooleanForces path-style access (host/bucket/key) instead of virtual-host-style (bucket.host/key). Required for many on-premise S3 implementations.
s3.access-key(none)StringAWS Access Key ID. Fallback keys: s3.access.key.
s3.secret-key(none)StringAWS Secret Access Key. Fallback keys: s3.secret.key.
s3.anonymous-credentialsfalseBooleanIf true, uses anonymous (unsigned) requests. Useful for reading public buckets.
s3.upload.min.part.size5MBMemorySizeThe minimum size of data buffered in-memory before a Multipart Upload (MPU) part is sent to S3.
s3.upload.max.concurrent.uploadsCPU CoresIntegerThe maximum number of concurrent MPU part uploads allowed per stream. Acts as a semaphore to prevent Direct Memory exhaustion.
s3.bulk-copy.enabledtrueBooleanEnables the use of S3TransferManager for optimized, parallel bulk copy operations (e.g., during recovery).
s3.bulk-copy.max-concurrent16IntegerControls parallelism for bulk copy operations.
s3.async.enabledtrueBooleanEnables asynchronous I/O operations for improved throughput.
s3.read.buffer.size256KBMemorySizeSize of the read buffer. Clamped between 64KB and 4MB. Higher values improve sequential read throughput.
s3.entropy.key(none)StringA string token in the path to be replaced by random entropy (sharding optimization).
s3.entropy.length4IntegerLength of the random entropy string injected.

Detailed Architectural Design

The design of the Native S3 FileSystem will be based on   

  1. Dependency Isolation
  2. Functional parity
  3. high-performance asynchronous I/O. 


1. Architectural Hermeticity and Dependency Isolation

A significant portion of the "Jar Hell" experienced by Flink users stems from the leakage of filesystem-specific dependencies into the broader classpath. To address this, the flink-s3-fs-native module adopts a strict shading policy. All internal dependencies, including the AWS SDK v2, Netty-based HTTP clients, and common utility libraries, are relocated to a Flink-internal namespace (e.g., org.apache.flink.fs.s3.native.shaded...). This ensures that the filesystem plugin behaves as a black box to the rest of the Flink runtime and the user application. 

2. Unified Functional Parity

Unlike the existing split between Hadoop and Presto connectors, the native implementation provides a single codebase capable of handling all S3 operations, including state backend checkpointing, savepoints, and streaming file sinks. This is achieved by implementing the full suite of Flink filesystem interfaces, most notably the RecoverableWriter, which maps Flink’s stream-oriented writes to S3’s multipart upload API.   

3. High-Performance Asynchronous I/O

The core of the native implementation utilizes the AWS SDK v2’s non-blocking I/O capabilities. This allows the Flink runtime to overlap I/O operations with computation more effectively than the legacy synchronous clients. The design allows for the selection between the standard Netty-based asynchronous client and the AWS Common Runtime (CRT) C-based client, the latter of which provides optimized throughput for bulk operations.

Component Breakdown and Interaction Logic

The flink-s3-fs-native Architecture is organised into several distinct layers, as visualised in the following system interaction diagram.



The diagram illustrates the flow of data and control signals from the Flink runtime through the new Native S3 implementation layer to the physical storage service.

1. Flink Runtime Layer :

This layer represents the high-level Flink components that trigger I/O operations.

  • Node A (Stream Operator): Initiates the standard write path, sending continuous data streams to the filesystem abstraction.
  • Node B (Native S3 Implementation): Represents  NativeS3RecoverableWriter, which acts as the "heart" of the transactional system. It buffers incoming data until it meets S3's 5MB multipart upload (MPU) requirement.
  • Node C (CheckpointCoordinator): Sends a snapshot signal during Flink's checkpointing phase. This triggers the writer to persist its current state, ensuring data durability and consistency.

2. Native S3 Implementation Layer 

This layer handles the mapping of Flink's stream-based writes to S3's object-based storage.

  • Node D (S3AccessHelper/Logic): An internal abstraction that decouples the writer logic from the specific AWS SDK implementation. It coordinates the lifecycle of Multipart Uploads, including startMultiPartUpload and uploadPart calls.
  • Node E & F (SDK Abstraction & Client): Represent the AWS SDK for Java v2 wrapper. Node F specifically acts as the S3AsyncClient factory that routes requests to the chosen transport protocol.

3. Client Transport Options:

A key architectural upgrade is the choice of HTTP transport layers.

  • Node G (Netty-based Async Client): The standard Java-native non-blocking I/O client, optimized for stability and standard JVM environments.
  • Node H (C-based CRT Client): The AWS Common Runtime client, designed for ultra-high throughput and lower CPU overhead, often performing 30-40% faster than Java-based clients for bulk operations.

4. Amazon S3 Service :

  • Node I (Amazon S3): The final destination for the data. The transport clients communicate with S3's REST API using regional endpoints or specialized storage like S3 Express One Zone.

5. State Management (Fault Tolerance)

  • Node J (Checkpoint State): When a checkpoint is triggered but the writer has less than 5MB of data in its buffer (the "tail"), it serializes these raw bytes into the CommitRecoverable object within Flink's state.
  • Recovery Path (J -.-> D): Upon a TaskManager failure, the system retrieves these serialized bytes and ETags from the state to rebuild the writer's buffer, allowing it to resume writing without data loss.


The NativeS3FileSystem and URI Handling

The NativeS3FileSystem class serves as the entry point for all s3:// scheme interactions. It extends Flink’s base filesystem abstractions and implements EntropyInjectingFileSystem to prevent S3 request throttling. In large-scale Flink deployments, writing thousands of checkpoint files to the same S3 prefix can lead to "hot spotting" in S3’s internal index shards. By injecting an entropy key into the path, the filesystem ensures that the data is distributed across different S3 partitions, significantly improving scaling performance.

The S3AccessHelper and SDK Abstraction

The S3AccessHelper is an internal interface designed to decouple Flink’s filesystem logic from the specific version of the AWS SDK. This abstraction is critical for maintaining consistency between the Hadoop and Presto modules while the native module is under development. It provides methods for the lifecycle management of objects and multipart uploads.

Method NameFunctional ResponsibilitySDK Interaction
startMultiPartUploadInitializes a new MPU session and returns an Upload ID.createMultipartUpload
uploadPartUploads a segment of data (min 5MB) associated with an MPU.uploadPart
commitMultiPartUploadFinalizes the MPU, making all parts visible as a single object.completeMultipartUpload
getObjectMetadataRetrieves size and ETag information for validation.headObject
deleteObjectRemoves objects during cleanup or rollback.deleteObject

The interface design focuses on atomicity and error recovery, as seen in the commitMultiPartUpload method, which accepts an AtomicInteger for error tracking during multi-part finalization.

The Recoverable Writer: Engineering Exactly-Once Sinks

The most critical component for streaming data pipelines is the NativeS3RecoverableWriter. This component must manage the impedance mismatch between Flink’s continuous byte streams and S3’s immutable object store.

Multipart Upload Strategy and the 5MB Constraint

Amazon S3’s Multipart Upload API requires that every part except for the very last one must be at least 5 MB in size. Flink, however, may trigger checkpoints much more frequently, often before 5 MB of data has been accumulated in the buffer. The NativeS3RecoverableWriter handles this through a hybrid persistence strategy.

When data is written to the RecoverableFsDataOutputStream, it is first stored in a local buffer (either a temporary file or an in-memory segment). Once the buffer crosses the threshold defined by s3.upload.min-part-size (defaulting to 5 MB), the writer initiates an asynchronous upload of that part to S3. The ETag returned by S3 is stored in the writer's memory.

Checkpointing and State Persistence (The "Tail" Problem)

During a checkpoint, Flink calls the persist() method. The writer must ensure that all data written up to that point is durable.

  1. Full Parts: Any data already uploaded as full 5MB+ parts are already durable on S3. Their ETags are recorded in the checkpoint state.

  2. The "Tail": Any data remaining in the buffer that is less than 5 MB cannot be uploaded as a standard MPU part. The NativeS3RecoverableWriter serializes these raw bytes directly into the Flink checkpoint state as part of the CommitRecoverable object.

  3. Atomic Commits: Upon successful completion of the job or a specific bucket, the S3Committer retrieves the list of ETags and the final "tail" (uploaded as the final part) and calls completeMultipartUpload.

This mechanism ensures that even if a TaskManager fails, the subsequent TaskManager can resume from the last successful checkpoint by retrieving the ETags and the small "tail" buffer from the state, thus avoiding data loss or duplication.


Implementation Details: RecoverableMultiPartUpload

The internal implementation of the MPU lifecycle is managed by classes like RecoverableMultiPartUploadImpl. This class coordinates the transition between active writing, partial uploads, and finalization. In unit testing environments, a StubMultiPartUploader is used to simulate these interactions without requiring a live S3 connection, ensuring the state machine logic is verified for edge cases like network interruptions during the final commit.

Transport Layer Analysis: Netty vs. AWS CRT

A significant enhancement in FLIP-555 is the ability to choose the underlying HTTP transport layer. The choice between the Netty-based client and the C-based CRT client involves trade-offs in CPU efficiency and environment compatibility.

Netty-based Asynchronous Client

The Netty client is the standard for the AWS SDK for Java v2. It is built on a mature, Java-native non-blocking I/O framework that integrates seamlessly with Flink’s own Netty-based communication stack used for TaskManager-to-TaskManager data shuffle. It provides excellent stability and is easier to debug within a standard JVM profiler.

AWS Common Runtime (CRT) Client

The AWS CRT client is a set of C-based libraries optimized for high-performance object storage interactions. Community feedback during the design of FLIP-555 highlighted that C-based tools like s5cmd often outperform Java-based clients by 30-40% in terms of raw throughput and CPU efficiency. By integrating the CRT client through the S3TransferManager, Flink can achieve similar performance gains for bulk operations such as state download during TaskManager recovery.

FeatureNetty ClientCRT Client
LanguageJavaC (Native)
I/O ModelNon-blocking NIOHighly optimized native transport
CPU EfficiencyModerateHigh (Lower overhead)
ThroughputHighUltra-High (Optimized for S3)
Best ForGeneral streaming and sinksLarge state downloads / Heavy bulk transfers

Resource Management and Operational Stability

The move to a native implementation provides finer control over resource consumption, addressing several "pain points" identified in legacy connectors.

Memory Management and OOM Prevention

One of the primary causes of OutOfMemoryError (OOM) during heavy S3 operations is uncontrolled concurrency and buffering. The legacy s5cmd approach, while fast, was noted for its lack of memory bounds during state downloads. The native connector addresses this through the s3.upload.max.concurrent.uploads configuration, which defaults to the number of available processors. This ensures that the Flink TaskManager does not overwhelm its internal memory pools with pending I/O buffers.

Network Rate Limiting and Burst Handling

In AWS environments, exceeding network quotas can lead to aggressive packet dropping, which often results in TaskManagers losing connection to the JobManager. The native implementation introduces rudimentary rate-limiting capabilities by controlling the number of parallel downloads and uploads. This prevents the S3 filesystem from starving the control plane of necessary bandwidth, a common issue in high-concurrency checkpointing scenarios.

Implementation Specification: Module and File Structure

The implementation follows a modular design to promote code reuse and maintain clear boundaries between common filesystem logic and the specific AWS SDK v2 implementation.

Core File Paths and Class Responsibilities

The following classes within the flink-filesystems directory structure are pivotal to the native S3 implementation.

  • flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/

    • FlinkS3FileSystem.java: Extends HadoopFileSystem to bridge the transition while providing entropy injection.

    • AbstractS3FileSystemFactory.java: The base factory class that handles common S3 configuration parsing (Access Keys, Secret Keys, Endpoints).

    • writer/S3RecoverableWriter.java: The primary interface for transactional sinks.

  • flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/

    • NativeS3FileSystemFactory.java: Specific factory for the SDK v2 implementation.

    • NativeS3AccessHelper.java: Implementation of S3AccessHelper using the AWS SDK v2 S3AsyncClient.

Configuration and Tuning Parameters

The native connector introduces a streamlined set of configuration keys. These keys are designed to be intuitive while providing the depth required for performance tuning.

KeyDefaultDescription
s3.client.typenetty

Choice of netty or crt for the HTTP transport.

s3.upload.min-part-size5 MB

Minimum size of a part in an MPU session.

s3.upload.max.concurrent.uploadsCPU Cores

Concurrency limit to prevent OOM.

s3.access-keyN/A

AWS Access Key (if not using IAM roles).

s3.secret-keyN/A

AWS Secret Key.

s3.endpointN/A

Override for regional endpoints or local S3 mocks.

s3.entropy.keyN/A

Key for distributing requests across S3 partitions.


Security Architecture: IAM, IRSA, and Regional Endpoints

A major advantage of adopting the AWS SDK v2 is the improved support for modern security and authentication paradigms.

IAM Roles and Service Accounts (IRSA)

The native connector natively supports IAM Roles for Service Accounts (IRSA), allowing Flink TaskManagers running in Kubernetes to assume fine-grained IAM roles without the need for static credentials. This is a significant security improvement over the NativeS3FileSystem implementations found in legacy versions like Flink 0.9.x, which often struggled with IAM role integration.

Regional and Custom Endpoints

Modern cloud architectures often utilize VPC endpoints or regional S3 buckets to reduce latency and egress costs. The native connector simplifies this through the s3.endpoint and s3.path-style-access configurations, which are critical for hybrid cloud or local development environments (e.g., using MinIO).

Deprecation Strategy

The transition to the native S3 filesystem is designed as a non-disruptive, multi-phase process to ensure stability and community confidence.

Phase 1: Optional Plugin and Community Validation

In the initial release, the flink-s3-fs-native module is introduced as an optional plugin. It registers the s3:// scheme, but because of Flink’s plugin isolation mechanism, it can coexist with the legacy Hadoop and Presto connectors if they use different schemes (e.g., s3a:// for Hadoop). This allows users to test the native connector on a per-job basis without affecting the rest of the cluster.

Phase 2: Promotion to Recommended Default

Once the native connector reaches feature parity—specifically regarding bulk copy operations using the PathsCopyingFileSystem interface—it will be promoted as the default recommendation in Flink's documentation. Performance benchmarks from community testing will be used to validate its readiness for the most demanding workloads.

Phase 3: Legacy Connector Sunset

In a subsequent major version of Apache Flink, the flink-s3-fs-hadoop and flink-s3-fs-presto connectors will be formally deprecated. A clear migration guide will be provided, detailing how to map legacy configurations (like those starting with presto.s3.) to the new unified s3. namespace.

Production Debugging and Error Handling

The native implementation provides more granular error reporting compared to the opaque error messages often returned by the Hadoop S3A wrapper.

Common Failure Scenarios and Native Handling

  • MPU Lifecycle Errors: If a checkpoint fails, the native writer uses the S3AccessHelper to ensure that orphaned multipart uploads are cleaned up or correctly resumed, preventing "storage leaks" in S3 buckets.

  • Classpath Conflicts: By shading the SDK and its Netty dependencies, the native connector eliminates the most common cause of Flink S3 failures: the NoSuchMethodError arising from version mismatches in common libraries.

  • Throttling: The native client’s retry logic is more sophisticated, leveraging the SDK v2’s default retry strategies, which are specifically tuned for AWS service limits.

Conclusion

The introduction of the Native S3 FileSystem (FLIP-555) marks a significant step forward in Apache Flink’s evolution toward a truly cloud-native stream processing engine. By centralizing S3 interactions into a single, high-performance, and dependency-isolated module, Flink resolves long-standing issues with architectural fragmentation and operational complexity. The transition from legacy Hadoop and Presto wrappers to a modern SDK v2-based implementation allows for superior resource management, better performance through asynchronous I/O, and enhanced security via modern IAM integrations. The phased migration strategy ensures that this transition is both manageable for the community and technically robust, securing Flink's ability to handle the next generation of massive-scale streaming data workloads on AWS.


Reject Alternatives 

Continue to rely on existing flink-s3-fs-hadoop and flink-s3-fs-presto connectors. Enhance them and keep relying on presto and Hadoop dependencies.