DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The log.segment.bytes broker config (and its topic-level synonym segment.bytes) is currently defined as ConfigDef.Type.INT, capping the maximum segment size at Integer.MAX_VALUE (2,147,483,647 bytes, approximately 2 GB). Additionally, the .index file format stores physical file positions as 4-byte signed integers, which also cannot address beyond approximately 2 GB.
With modern storage hardware (multi-TB NVMe drives) and high-throughput workloads, the 2 GB cap is increasingly a problem:
- Excessive file handle usage: Each segment needs 4 files (
.log,.index,.timeindex,.txnindex). A 10 TB partition with 2 GB segments means approximately 20,000 open files. - Frequent segment rolls: A topic ingesting 500 MB/s rolls a new segment every approximately 4 seconds, amplifying index build, flush, and cleaner overhead.
- More log cleaning and compaction work: More segments means more compaction cycles with more small groups.
- Remote storage overhead: Each segment is an individual unit for tiered storage copy/delete operations.
Allowing segments of 4 GB, 8 GB, or larger would significantly reduce these overheads for high-throughput, large-retention workloads.
Public Interfaces
Configuration changes
| Config | Current type | New type | Current range | New range |
|---|---|---|---|---|
log.segment.bytes (broker) | INT | LONG | [1 MB, 2,147,483,647] | [1 MB, Long.MAX_VALUE] (after MetadataVersion finalization) |
segment.bytes (topic) | INT | LONG | [1 MB, 2,147,483,647] | [1 MB, Long.MAX_VALUE] (after MetadataVersion finalization) |
The expanded range (values greater than Integer.MAX_VALUE) is gated by MetadataVersion. Before finalization, the effective range remains [1 MB, Integer.MAX_VALUE]. After IBP_4_4_IV1 is finalized, the range becomes [1 MB, Long.MAX_VALUE].
Index size guidance: With 12-byte entries, log.index.size.max.bytes (default 10 MB) holds approximately 873K entries. For an 8 GB segment with the default log.index.interval.bytes of 4096 bytes, approximately 2M index entries would be needed, requiring approximately 24 MB of index space. Operators using segments larger than 2 GB should increase log.index.size.max.bytes proportionally.
On-disk format changes
Offset index (.index) file format -- new 12-byte entry format (gated by MetadataVersion):
| Field | Legacy format (8 bytes per entry) | Large format (12 bytes per entry) |
|---|---|---|
| Relative offset | 4-byte signed int | 4-byte signed int |
| Physical position | 4-byte signed int (max approximately 2 GB) | 8-byte signed long (effectively unlimited) |
The large format is only written after MetadataVersion IBP_4_4_IV1 is finalized. Before finalization, all index files use the legacy 8-byte format.
Format detection for existing files: When opening an existing index file, the OffsetIndex auto-detects the format by analyzing the file size and validating entry content:
- If file size is divisible by 12 but not 8: large format (unambiguous)
- If file size is divisible by 8 but not 12: legacy format (unambiguous)
- If file size is divisible by both 8 and 12 (ambiguous): validates the first few entries with each format, checking that relative offsets and positions are non-negative and monotonically non-decreasing. The format that produces valid entries is selected. If both validate (rare), the
useLargeFormathint from MetadataVersion is used as the tiebreaker and a warning is logged for operator visibility. - If file size is divisible by neither: file is corrupt and will be rebuilt via
sanityCheck()andrecover().
This auto-detection enables:
- Upgrade safety: After MetadataVersion finalization, old 8-byte index files on disk are correctly detected and read. When they are rebuilt (via
LogSegment.recover()), they are written in the new 12-byte format. - RemoteIndexCache compatibility: Remote indexes fetched from tiered storage may be in either format (depending on when the segment was uploaded). The auto-detection reads the file content and determines the correct format regardless of the MetadataVersion.
Time index (.timeindex) -- no format change. Entry size is already 12 bytes (8-byte timestamp + 4-byte relative offset). No physical positions are stored.
Transaction index (.txnindex) -- no format change. Uses FileChannel directly with long positions.
Java API changes
| Class | Member | Before | After |
|---|---|---|---|
LogConfig | DEFAULT_SEGMENT_BYTES | int | long |
LogConfig | segmentSize() | returns int | returns long |
LogConfig | initFileSize() | returns int | returns long |
LogConfig | useLargeIndexFormat | N/A (new) | boolean, default false |
AbstractKafkaConfig | logSegmentBytes() | returns Integer via getInt() | returns Long via getLong() |
RollParams | maxSegmentBytes | int | long |
OffsetIndex | append(long offset, ...) | int position | long position |
OffsetIndex | detectEntrySize(File, int) | N/A (new) | static method, auto-detects format for existing files |
OffsetPosition | position field | int | long |
FileRecords | internal size field | AtomicInteger | AtomicLong |
LazyIndex | forOffset(...) | no format param | new overload with useLargeFormat parameter |
LogSegment | new sizeInBytesLong() | N/A | returns long |
LogSegment | recover() | returns int | returns long |
LogSegment | truncateTo() | returns int | returns long |
LogOffsetMetadata | relativePositionInSegment | int | long |
SegmentPosition (raft) | relativePosition | int | long |
RemoteStorageManager | fetchLogSegment(metadata, int) | only overload | @Deprecated; new default method with long added |
RemoteStorageManager | fetchLogSegment(metadata, int, int) | only overload | @Deprecated; new default method with long, long added |
SegmentPosition change rationale: SegmentPosition.relativePosition (raft module) is widened from int to long because it is constructed from LogOffsetMetadata.relativePositionInSegment, which is now long. Keeping types consistent avoids lossy truncation. In practice, KRaft metadata segments are small (approximately 1 GB), so positions would not exceed Integer.MAX_VALUE, but type consistency prevents subtle bugs.
Not changed
BaseRecords.sizeInBytes()remainsint(449 callers across 89 files -- cascading this change is too large for this KIP).FileRecordsadds a newsizeInBytesLong()method that returns the truelongsize. The existingsizeInBytes()clamps atInteger.MAX_VALUE. All internal storage layer callers have been migrated tosizeInBytesLong(). The 449 client/network layer callers deal with fetch responses and producer batches bounded bymax.message.bytes(int), so clamping does not affect them.RecordBatch.sizeInBytes()remainsint(bounded bymax.message.bytes).MemoryRecords.sizeInBytes()remainsint(bounded byByteBuffercapacity).- Other coordinator segment configs (
transaction.state.log.segment.bytes,offsets.topic.segment.bytes,share.coordinator.state.topic.segment.bytes) remainINT. RemoteLogSegmentMetadata.segmentSizeInBytesremainsint(schema uses int32). For segments larger than 2 GB, the size in metadata is clamped toInteger.MAX_VALUEand a warning is logged. This will be addressed in a follow-up schema evolution.
Monitoring
No new metrics are added. Existing segment size metrics will report accurate values for segments larger than 2 GB because LogSegments.sizeInBytes() uses long arithmetic internally.
Command line tools
kafka-log-dirs.sh and DumpLogSegments correctly handle segments larger than 2 GB. DumpLogSegments uses auto-detection for index format since it does not have access to MetadataVersion.
Proposed Changes
Config type change
Change log.segment.bytes and segment.bytes from ConfigDef.Type.INT to ConfigDef.Type.LONG. Apply atLeast(1024 * 1024) as the validator. The expanded range (values greater than Integer.MAX_VALUE) is gated by MetadataVersion IBP_4_4_IV1.
Storage layer widening
Widen internal storage layer types from int to long for segment sizes and physical file positions:
FileRecords: Internal AtomicInteger changed to AtomicLong for size tracking. New sizeInBytesLong(), sliceLong(), truncateToLong() methods added for callers that need long precision. The existing BaseRecords.sizeInBytes() interface remains int to avoid cascading changes across 449 call sites.
LogSegment: New sizeInBytesLong() alongside existing size(). Methods recover(), append(), shouldRoll(), read(), and truncateTo() widened to use long for positions and sizes. All internal callers migrated from size() to sizeInBytesLong():
| Call site | Before | After |
|---|---|---|
LogSegment.append() physicalPosition | int log.sizeInBytes() | long log.sizeInBytesLong() |
LogSegment.shouldRoll() | int size = size() | long size = sizeInBytesLong() |
LogSegment.read() startPosition | int | long |
LogSegment.read() fetchSize | (int)(maxPosition - startPosition) | (int) Math.min(maxPosition - startPosition, (long) adjustedMaxSize) |
LogSegment.recover() validBytes | int | long |
LogSegment.truncateTo() return | int | long |
LogSegment.toString() | size() | sizeInBytesLong() |
UnifiedLog retention | segment.size() | segment.sizeInBytesLong() |
LocalLog.updateLogEndOffset() | segment.size() | segment.sizeInBytesLong() |
LocalLog.read() maxPosition | segment.size() | segment.sizeInBytesLong() |
LocalLog.splitOverflowedSegment() | int totalSize | long totalSize |
Cleaner.groupSegmentsBySize() | segment.size() | segment.sizeInBytesLong() |
LogSegments.sizeInBytes() | LogSegment::size | LogSegment::sizeInBytesLong |
LogLoader initial metadata | activeSegment.size() | activeSegment.sizeInBytesLong() |
OffsetIndex dual format with MetadataVersion gating and auto-detection
The OffsetIndex supports two entry formats, controlled by a useLargeFormat constructor parameter:
| Format | Entry size | Layout | When used |
|---|---|---|---|
| Legacy (default) | 8 bytes | [4-byte relative offset] [4-byte physical position] | Before IBP_4_4_IV1 finalization |
| Large | 12 bytes | [4-byte relative offset] [8-byte physical position] | After IBP_4_4_IV1 finalization |
Format selection mechanism:
The format of each OffsetIndex instance is determined through a layered selection process with three distinct code paths:
Path 1 -- Production (local segments): The primary format selection comes from MetadataVersion via LogConfig.useLargeIndexFormat. When a broker starts or creates a new segment, LogSegment.open() reads LogConfig.useLargeIndexFormat and passes it to LazyIndex.forOffset(useLargeFormat). When the lazy index is first accessed, LazyIndex.loadIndex() constructs an OffsetIndex with the useLargeFormat flag. Before IBP_4_4_IV1 is finalized, this flag is always false and all indexes use the legacy 8-byte entry format. After finalization, the flag becomes true and new indexes are written in the 12-byte format.
Path 2 -- Remote Index Cache (tiered storage): RemoteIndexCache fetches offset index files from remote storage and creates OffsetIndex instances using the default constructor with useLargeFormat=false. Since remote index files were uploaded at whatever format was active when the segment was copied to remote storage, the actual format is determined by auto-detection from the file content (described below). The useLargeFormat=false hint serves as a safe default for the ambiguous case.
Path 3 -- CLI Tools (DumpLogSegments): Offline tools like DumpLogSegments do not have access to MetadataVersion. They create OffsetIndex with useLargeFormat=false and rely entirely on auto-detection to determine the correct format from the file content.
Auto-detection for existing files: All three paths converge at the OffsetIndex constructor, which calls detectEntrySize(file, requestedEntrySize). For new files (file does not exist yet), the requested entry size is used directly. For existing files with data, the format is auto-detected:
- If the file size is divisible by 12 but not 8: the file uses the large format (unambiguous).
- If the file size is divisible by 8 but not 12: the file uses the legacy format (unambiguous).
- If the file size is divisible by both 8 and 12 (ambiguous): the first few entries are read with each format and validated for monotonically non-decreasing relative offsets and non-negative positions. The format that produces valid entries is selected. If both validate, the
requestedEntrySize(from MetadataVersion) is used as the tiebreaker and a warning is logged so operators have visibility into the ambiguous detection. - If the file size is divisible by neither: the file is corrupt. The requested entry size is used, and
sanityCheck()will detect the corruption and trigger a rebuild viarecover().
Constructor safety: The AbstractIndex base class is enhanced with an effectiveEntrySize() method that safely resolves the entry size at construction time. This avoids calling the overridable entrySize() method from the constructor, which would fail because subclass fields are not yet initialized when the parent constructor runs.
Corruption detection for ambiguous files: When the file size is divisible by both 8 and 12 and the content is corrupted, the auto-detection may pick the wrong format. To catch this, sanityCheck() performs full monotonicity validation: it scans all entries and verifies that relative offsets and physical positions are non-negative and monotonically non-decreasing. If any entry violates these invariants, CorruptIndexException is thrown, which LogLoader catches and handles by triggering recover() to rebuild the index from the .log file.
Bounds check in legacy mode: When writing entries in legacy mode, OffsetIndex.append() checks that the physical position does not exceed Integer.MAX_VALUE and throws IllegalArgumentException if it does. This provides a clear error message ("Finalize MetadataVersion to IBP_4_4_IV1 to enable large index format") instead of silently truncating the position via an (int) cast.
MetadataVersion gating
A new MetadataVersion entry gates the format change:
IBP_4_4_IV1(32, "4.4", "IV1", true) // didMetadataChange=true
isLargeIndexFormatSupported()helper method returnstruewhen the cluster MetadataVersion >=IBP_4_4_IV1.didMetadataChange=trueensures downgrade is blocked after finalization, consistent with existing KRaft downgrade rules.
RemoteStorageManager API
New default methods added to the RemoteStorageManager interface with long position parameters:
default InputStream fetchLogSegment(RemoteLogSegmentMetadata metadata, long startPosition)
default InputStream fetchLogSegment(RemoteLogSegmentMetadata metadata, long startPosition, long endPosition)
These delegate to the existing int methods with bounds checking. Existing RemoteStorageManager implementations continue to work unchanged. The old int methods are marked @Deprecated.
RemoteIndexCache compatibility: RemoteIndexCache creates OffsetIndex instances from fetched remote index files. Since remote indexes may be in either format (depending on whether the segment was uploaded before or after MetadataVersion finalization), RemoteIndexCache uses the default constructor (useLargeFormat=false) and relies on auto-detection to determine the correct format from the file content.
Compatibility, Deprecation, and Migration Plan
Rolling upgrade path
- Upgrade all brokers to the new version. Do not finalize the metadata version yet. Brokers write index files in legacy 8-byte format (identical to old brokers). Full backward and forward compatibility. Downgrade is safe.
- Finalize the metadata version via
kafka-features.sh upgrade --release-version 4.4.LogConfig.useLargeIndexFormatbecomestrue. New index files are written in 12-byte format. Existing 8-byte index files continue to be read correctly via auto-detection. When an index is rebuilt (for example duringLogSegment.recover()), it is written in the new format. Thesegment.bytesconfig upper bound is lifted. - Downgrade after finalization is blocked because
IBP_4_4_IV1hasdidMetadataChange=true, consistent with existing KRaft downgrade rules.
Backward compatibility
- Config parsing:
INTvalues stored as strings (for example"1073741824") parse correctly asLONG. No user action required. - Index format: Before MetadataVersion finalization, all indexes use the legacy 8-byte format. Old and new brokers produce identical index files.
- Index auto-detection: After MetadataVersion finalization, old 8-byte index files on disk are auto-detected and read correctly. They are rebuilt in 12-byte format when
recover()is triggered. - RemoteStorageManager: Existing implementations only implement the
intmethods. The newlongdefault methods delegate to the oldintmethods with bounds checking. No changes required for existing RSM plugins.
Forward compatibility
- Before finalization: full downgrade is safe. All index files are in legacy format.
- After finalization: downgrade is blocked by KRaft metadata version rules.
- If segments larger than 2 GB were created after finalization, operators must set
segment.bytesback to 2 GB or less and wait for segment rolls before downgrading.
Deprecation
RemoteStorageManager.fetchLogSegment(RemoteLogSegmentMetadata, int)is deprecated in favor offetchLogSegment(RemoteLogSegmentMetadata, long).RemoteStorageManager.fetchLogSegment(RemoteLogSegmentMetadata, int, int)is deprecated in favor offetchLogSegment(RemoteLogSegmentMetadata, long, long).- No configs are deprecated or removed.
- The deprecated methods will be removed in a future major release.
Impact on existing users
- Users who never set
segment.bytesabove the current default (1 GB) are completely unaffected. - Users who want larger segments must first finalize
MetadataVersiontoIBP_4_4_IV1. - Custom
RemoteStorageManagerimplementations should migrate to thelongoverloads at their convenience. The deprecatedintmethods continue to work.
Test Plan
Unit tests
- Config parsing and round-trip tests verify
LONGtype works correctly forlog.segment.bytesandsegment.bytes. - OffsetIndex dual-format tests verify both 8-byte legacy and 12-byte large entry formats can be written and read correctly.
- OffsetIndex format auto-detection tests verify correct format identification for unambiguous file sizes (divisible by only one format), ambiguous file sizes (divisible by both), and edge cases (empty files, non-existent files).
- OffsetIndex upgrade/downgrade tests verify that legacy-format indexes created by old code are correctly auto-detected and read by new code (even when
useLargeFormat=true), and that indexes written by new code in legacy mode are readable by old code. - FileRecords tests verify
sizeInBytesLong()returns accurate values exceedingInteger.MAX_VALUE, andtruncateToLong()handles truncation amounts exceedingInteger.MAX_VALUE. - LogSegment tests verify
shouldRoll()works withmaxSegmentBytesgreater than 2 GB,sizeInBytesLong()consistency, and recovery preserves all records. - RemoteLogManager tests verify the new
long-paramfetchLogSegment()methods work correctly.
Integration tests
- Tiered storage integration tests (OffloadAndConsumeFromLeaderTest, RollAndOffloadActiveSegmentTest, DynamicSegmentSizeChangeTest) verify end-to-end produce, offload, consume, and broker bounce with mixed segment sizes.
- DynamicSegmentSizeChangeTest specifically tests dynamically changing segment size on a tiered-storage-enabled topic: creates a topic with 1-batch segments, produces and offloads records, dynamically increases segment size to 3 batches per segment via
updateTopicConfig, produces more records verifying the new segment size takes effect, bounces the broker, and consumes all records to verify data integrity across mixed segment sizes spanning both local and remote tiers.
System-level verification (performed manually)
- Live end-to-end test: dynamic config changes (1 GB to 4 GB to 1 GB segment sizes). Produced 3000 records at 1 GB, dynamically changed to 4 GB (value > INT_MAX accepted), produced 1000 more, downgraded to 1 GB, produced 500 more. All 4500 records consumed correctly at each phase with zero errors. Records spanning config change boundaries consumed correctly.
- Upgrade test: Old broker (trunk) produces 3000 records across 3 segments. New broker starts on same data directory, reads old data, produces new records, all produce and consume operations succeed. Index files remain in legacy 8-byte format (MetadataVersion not finalized).
- Downgrade test: After upgrade test, old broker starts on data written by new broker. Reads all data (both pre-upgrade and post-upgrade), produces new records, all operations succeed.
Rejected Alternatives
1. Keep log.segment.bytes as INT permanently
Rejected. The 2 GB limit is an artificial constraint from a type choice made when storage hardware was smaller. Modern deployments routinely manage multi-TB partitions where 2 GB segments create excessive overhead in file handles, segment rolls, compaction cycles, and tiered storage operations.
2. Add a separate log.segment.bytes.long config
Rejected. Maintaining two configs for the same purpose adds confusion for operators. A single config with a type change is cleaner and follows the precedent set by KIP-1161, which reclassified several configs from STRING to LIST type.
3. Widen index entries to 16 bytes (8-byte offset + 8-byte position)
Rejected. The relative offset (4 bytes) is sufficient because it represents the delta from the segment base offset, not an absolute offset. Only the physical position needs widening to 8 bytes. Using 16 bytes per entry would waste 50% more space for no practical benefit.
4. Use unsigned int for physical position (4 GB range)
Rejected. Java does not natively support unsigned integers, making the code error-prone (values above Integer.MAX_VALUE appear negative, breaking comparison operators and binary search). The additional 2 GB headroom is not worth the complexity. Widening to long is the clean solution and future-proofs the format.
5. Do only the config change without the index format change
Rejected as the complete approach. While Phase 1 (config type change with INT range cap) is useful as a stepping stone, it does not deliver the actual user-facing value of larger segments. A single KIP covering the full scope ensures the community reviews the complete design, even though implementation can be phased across multiple PRs.
6. Per-partition marker file for index format migration
Rejected. An earlier prototype used a .index_version marker file in each partition directory to track whether indexes had been rebuilt in the new format. On first startup after upgrade, if the marker was absent, all indexes were rebuilt. This approach was rejected because:
- It does not follow Kafka conventions. Every other on-disk format change in Kafka uses MetadataVersion gating.
- No downgrade support. The marker file approach immediately writes new-format indexes on upgrade, with no way to revert. MetadataVersion gating allows all brokers to be upgraded (still writing old format) before the format switch is finalized.
- Mixed-version cluster risk. In a rolling upgrade, broker A would immediately start writing 12-byte indexes while broker B (not yet upgraded) still expects 8-byte.
- False negatives in format detection. Old 8-byte index files whose size is divisible by both 8 and 12 (for example, 72 bytes from 9 entries) cannot be reliably detected by file-size heuristics alone.
7. Magic byte header in index files for format detection
Rejected as the primary mechanism. Adding a version byte at the start of each index file would make format detection unambiguous, but:
- It shifts all entry positions by 1 byte, complicating the mmap access pattern (entries no longer start at byte 0).
- Old brokers would misread the header byte as part of the first entry, potentially producing garbled offset lookups before sanity checks catch it.
- It adds complexity to every index read path.
The chosen approach (MetadataVersion gating as primary, content-based auto-detection as fallback) achieves the same reliability without modifying the file layout. Auto-detection is only needed for edge cases (tools without MetadataVersion access, remote indexes in unknown format).
8. Auto-detection as the primary format selection mechanism
Rejected. An earlier iteration relied solely on file-content analysis to determine the index format. This was found to be unreliable because:
- Pre-allocated (active segment) index files have file sizes that reflect the pre-allocation size, not the actual data size. The trailing zero bytes make divisibility checks meaningless.
- For files whose size is divisible by both 8 and 12, entry validation can produce false positives when legacy data happens to look valid when reinterpreted as large-format entries (which occurs for small, monotonically increasing positions -- the common case).
- The fallback to a default format when both formats validate means the result depends on the caller's
useLargeFormathint, making auto-detection non-deterministic.
Auto-detection is retained as a fallback safety net for tools and remote indexes, but the primary format selection comes from MetadataVersion via LogConfig.useLargeIndexFormat.