DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1. Motivation
The FRESHNESS clause, as introduced in FLIP-435, is a mandatory part of the CREATE MATERIALIZED TABLE syntax. While this forces users to be explicit about their data recency requirements, it introduces friction, especially for new users and everyday use cases.
The primary motivations for making this clause optional are:
Reduce Boilerplate: For many users, the goal is to create a continuously updating materialized table with a low-latency, "near real-time" refresh. Requiring them to specify
FRESHNESS = INTERVAL '...' SECOND/MINUTEin every single statement is redundant for this common pattern.Lower Barrier to Entry: The
FRESHNESSconcept, while powerful, currently forces new users to immediately understand the distinction betweenCONTINUOUSandFULLrefresh modes. More importantly, to choose a sensibleFRESHNESS, they implicitly need to understand that this value becomes the job's checkpoint interval in the default streaming mode.By providing a sensible default, we remove this requirement. A new user can get a working, continuous pipeline without needing to know about the underlying checkpointing mechanism upfront. They can start with a simple, functional table and learn about performance tuning concepts like checkpointing later, as their needs become more advanced.
Enable Platform-Level Intelligence: A more powerful architectural approach is to allow the underlying
Catalogto determine the default freshness, enabling "smart" catalogs that can implement context-aware logic.
2. Proposed Changes
We propose two primary changes to enable optional FRESHNESS in a way that is both user-friendly and architecturally extensible:
Make the
FRESHNESSsyntax optional: TheFRESHNESSclause will be made optional in theCREATE MATERIALIZED TABLEandCREATE OR ALTER MATERIALIZED TABLEDDL statements. This provides the immediate user benefit of reducing boilerplate for common use cases.Introduce a pluggable resolution mechanism: We will introduce a new
MaterializedTableEnricherinterface. This provides a formal extension point for customizable default logic, allowing advanced users and vendors to implement "smart" default behaviors (e.g., inferring freshness from upstream tables).
2.1. Resolution in the Catalog Manager
The core of this proposal is to shift the responsibility of resolving the final freshness and refreshMode to the CatalogManager before a materialized table is created in a Catalog.
The new flow will be:
The SQL parser will be updated to make the
FRESHNESSclause optional.The
SqlToOperationConverterwill create aCatalogMaterializedTableobject where thefreshnessandrefreshModemay be absent (null).When a
CreateMaterializedTableOperationis executed, it will pass this unresolved object to theCatalogManager.The
CatalogManagerwill use a newMaterializedTableEnricherto produce aResolvedCatalogMaterializedTable. This final, resolved object is guaranteed to have non-nullfreshnessandrefreshMode.This resolved object is then passed to the target
Catalogfor creation.
2.2. Default Implementation and Configuration
To provide a predictable and consistent out-of-the-box experience, Apache Flink will include a DefaultMaterializedTableEnricher. This implementation will be used by default and its behavior is driven by two new configuration options.
2.2.1. New Configuration Options
To provide standard, configurable defaults for both streaming and batch use cases, we will introduce two new configuration keys:
materialized-table.default-freshness.continuousPurpose: The default freshness for
CONTINUOUSorAUTOMATICrefresh modes whenFRESHNESSis omitted.Default Value:
3 min
materialized-table.default-freshness.fullPurpose: The default freshness for
FULLrefresh mode whenFRESHNESSis omitted.Default Value: 1 hour
2.2.2. Enrichment Logic
The enrich method in the DefaultMaterializedTableEnricher follows a clear, two-step process to resolve the final properties:
Step 1: Resolve Freshness It first determines the final freshness interval.
If the user-provided freshness is not
null, the user's value is always respected.If the user-provided freshness is
null, it inspects thelogicalRefreshModeto select the appropriate default from the configuration.
Step 2: Resolve Refresh Mode After establishing the final freshness, it then resolves the physical refreshMode using the standard derivation logic (comparing freshness against the freshness-threshold for AUTOMATIC mode) and performs the necessary validations.
3. Public Interfaces
This proposal uses a non-breaking, "deprecate and add" strategy to evolve the public API.
3.1. SQL Syntax Change
The FRESHNESS clause in both the CREATE MATERIALIZED TABLE and the proposed CREATE OR ALTER MATERIALIZED TABLE (FLIP-546) statements in the Calcite SQL grammar file (Sql.ftl) will be changed from mandatory to optional.
3.1.1. CREATE MATERIALIZED TABLE
Current Syntax:
CREATE MATERIALIZED TABLE ...
[WITH (...)]
FRESHNESS = INTERVAL '...' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] } -- Mandatory
[REFRESH_MODE = ...]
AS <select_statement>
Proposed Syntax:
CREATE MATERIALIZED TABLE ...
[WITH (...)]
[FRESHNESS = INTERVAL '...' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }] -- Now Optional
[REFRESH_MODE = ...]
AS <select_statement>
3.1.2. CREATE OR ALTER MATERIALIZED TABLE (from FLIP-546)
To ensure consistency, the optionality of FRESHNESS will also apply to the CREATE OR ALTER command.
Proposed Syntax:
CREATE OR ALTER MATERIALIZED TABLE ...
[WITH (...)]
[FRESHNESS = INTERVAL '...' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }] -- Now Optional
[REFRESH_MODE = ...]
AS <select_statement>
3.2. IntervalFreshness Improvements
The IntervalFreshness class will be improved for better type safety and usability. Key changes include:
Internal representation of the interval will use
intfor type safety.Validation logic is moved into the class itself.
A new
toDuration()method provides a clean way to convert to a standardjava.time.Duration.
To manage this transition in a backward-compatible way, the existing getInterval() method will be deprecated in favor of a new, type-safe alternative.
public class IntervalFreshness {
private final int interval;
/**
* @deprecated Use {@link #getIntervalInt()} instead.
*/
@Deprecated
public String getInterval() {
return String.valueOf(interval);
}
/**
* Returns the interval value as an integer.
*/
public int getIntervalInt() {
return interval;
}
// ... other methods ...
}
3.3. API Evolution: CatalogMaterializedTable
To allow the planner to pass an unresolved object to the CatalogManager without a hard breaking change, we will evolve the CatalogMaterializedTable interface using a "deprecate and add" strategy. The return types of key methods will be marked as @Nullable, and the convenient getFreshness() method will be deprecated.
public interface CatalogMaterializedTable extends CatalogBaseTable {
/**
* Returns the user-defined freshness, or {@code null} if it was not specified in the DDL.
* The final value is determined by the {@code MaterializedTableEnricher}.
*/
@Nullable
IntervalFreshness getDefinitionFreshness();
/**
* Returns the user-defined physical refresh mode, or {@code null} if it was not specified.
* The final value is determined by the {@code MaterializedTableEnricher}.
*/
@Nullable
RefreshMode getRefreshMode();
/**
* Get the {@link Duration} value of materialized table definition freshness.
*
* @deprecated use {@link #getDefinitionFreshness()} together with {@link
* IntervalFreshness#toDuration()} instead.
*/
@Deprecated
default @Nullable Duration getFreshness() {
final IntervalFreshness definitionFreshness = getDefinitionFreshness();
return definitionFreshness == null ? null : definitionFreshness.toDuration();
}
// ...
}
The final, resolved object (ResolvedCatalogMaterializedTable) that is passed to connectors and other downstream components will continue to have guaranteed non-null values for these fields, limiting the impact of this change to the DDL processing path.
3.4. Default Freshness config
To provide a standard, configurable default, we will introduce two new configuration keys materialized-table.default-freshness.continuous and materialized-table.default-freshness.full in the MaterializedTableConfigOptions class. Each with their own default durations.
public static final ConfigOption<Duration> MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS =
key("materialized-table.default-freshness.continuous")
.durationType()
.defaultValue(Duration.ofMinutes(3))
.withDescription(
"The default freshness interval for continuous refresh mode when the FRESHNESS clause is omitted in a materialized table definition.");
public static final ConfigOption<Duration> MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL =
key("materialized-table.default-freshness.full")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition.");
3.5. New MaterializedTableEnricher Interface
A new public, experimental interface will be introduced to encapsulate the enrichment logic. A DefaultMaterializedTableEnricher will be provided as explained in the section 2.2.
/**
* Enricher interface for determining materialized table properties during catalog resolution.
*
* <p>This enricher resolves:
*
* <ul>
* <li>Freshness intervals when not explicitly specified by the user
* <li>Physical refresh modes (CONTINUOUS or FULL) based on logical preferences and configuration
* </ul>
*
* <p>Implementations can provide custom strategies tailored to different deployment environments or
* operational requirements.
*/
@Experimental
public interface MaterializedTableEnricher {
/**
* Enriches a materialized table by determining its final freshness interval and refresh mode.
*
* @param catalogMaterializedTable the materialized table to enrich, which may have null
* freshness
* @return the enrichment result with resolved, non-null freshness and refresh mode
*/
EnrichmentResult enrich(CatalogMaterializedTable catalogMaterializedTable);
}
To keep the initial scope of this FLIP focused and deliverable, we propose that the framework will directly use the DefaultMaterializedTableEnricher for now. This provides a complete, working feature out of the box.
The mechanism for discovering and configuring custom enrichers (e.g., via a service loader or environment configuration) is a crucial topic, but I believe it warrants its own dedicated discussion in a follow-up FLIP.
3.6. New EnrichmentResult class
Carries the final, resolved, non-null properties from the enricher.
/**
* Result of the enrichment process containing the resolved freshness interval and physical refresh
* mode for a {@link CatalogMaterializedTable}.
*
* <p>This object is returned by {@link MaterializedTableEnricher} after determining the final,
* non-null values for both properties.
*/
@Experimental
public class EnrichmentResult {
private final IntervalFreshness freshness;
private final RefreshMode refreshMode;
public EnrichmentResult(final IntervalFreshness freshness, final RefreshMode refreshMode) {
this.freshness = freshness;
this.refreshMode = refreshMode;
}
public IntervalFreshness getFreshness() {
return freshness;
}
public RefreshMode getRefreshMode() {
return refreshMode;
}
}
4. User Journeys
4.1. New User Creating a Simple Streaming View
A new user wants to create a simple materialized table. They do not yet need to configure advanced options. Assuming materialized-table.refresh-mode.freshness-threshold is 30 minutes. New, Simplified Syntax:
-- The FRESHNESS clause is omitted.
CREATE MATERIALIZED TABLE high_value_orders
AS
SELECT
product_id,
price
FROM orders;
Outcome: The DefaultMaterializedTableEnricher is invoked. Since freshness is null, it applies the default value from the materialized-table.default-freshness.continuous configuration (3 min). A continuous streaming job is started, providing the expected "near real-time" behavior with minimal syntax.
4.2. User defines FRESHNESS explicitly
An advanced user needs to create a table that is refreshed on an hourly basis. Assuming materialized-table.refresh-mode.freshness-threshold is 30 minutes. Existing Syntax (No Change in Behavior):
-- The user explicitly provides the FRESHNESS clause to override the default.
CREATE MATERIALIZED TABLE hourly_summary
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT
product_id,
price
FROM orders;
Outcome: The command succeeds, behaving exactly as it does today. The explicit freshness value triggers the FULL refresh mode, and a scheduled batch job is created. This demonstrates that the change is non-breaking for existing use cases.
4.3. User calls CREATE OR ALTER
This journey clarifies the declarative behavior when a previously specified FRESHNESS clause is removed in a subsequent CREATE OR ALTER statement.
A developer is managing a materialized table via a CI/CD pipeline.
Step 1: Initial Deployment with Explicit Freshness
The developer initially deploys the table with a non-default, hourly freshness to meet a specific business requirement.
-- file: materialized_tables/hourly_summary.sql
CREATE OR ALTER MATERIALIZED TABLE hourly_summary
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT
product_id,
price
FROM orders;
Outcome: The command succeeds. The materialized table is created with a FRESHNESS of 1 hour, and a FULL refresh job is scheduled accordingly.
Step 2: Evolution - The FRESHNESS Clause is Removed
Later, the developer modifies the query's business logic. While doing so, they remove the FRESHNESS clause from the SQL file, perhaps assuming the previous value of '1 HOUR' will be retained.
-- file: materialized_tables/hourly_summary.sql
-- The FRESHNESS clause has been removed.
CREATE OR ALTER MATERIALIZED TABLE hourly_summary
AS
SELECT
product_id,
price,
description -- added a new nullable field
FROM orders;
Outcome:
The command succeeds, but the table's FRESHNESS is reset to the default of INTERVAL '3' MINUTE.
4.4. User Specifies FULL Refresh Mode without FRESHNESS
A user wants a batch-refreshed table but wants to rely on the system's default batch interval, rather than specifying it themselves.
Syntax:
-- The user explicitly sets the refresh mode but omits the freshness.
CREATE MATERIALIZED TABLE daily_summary
REFRESH_MODE = FULL
AS
SELECT
user_id,
COUNT(*) AS total_orders
FROM orders
GROUP BY user_id;
Outcome: The DefaultMaterializedTableEnricher is invoked. It sees that freshness is null but the logicalRefreshMode is explicitly set to FULL. Following its logic, it applies the default value from the materialized-table.default-freshness.full configuration (e.g., 1 hour). The system then creates a scheduled FULL refresh job with a 1 hour interval. This correctly handles the user's intent to create a batch job without needing to specify the exact interval.
5. Compatibility, Deprecation, and Migration Plan
Backward Compatibility: This proposal introduces a semantic breaking change to the
CatalogMaterializedTableinterface, asgetDefinitionFreshness()andgetRefreshMode()can now returnnull. However, the impact is contained. The finalResolvedCatalogMaterializedTablepassed to connectors and the Flink runtime will continue to have guaranteed non-null values, ensuring that the runtime path is unaffected.Migration Plan: Developers of custom tooling that directly consumes an unresolved
CatalogMaterializedTableare encouraged to update their code to handle potentialnullvalues from the affected methods. ThegetFreshness()method is officially deprecated.
6. Test Plan
The implementation will be validated with unit and integration tests covering:
Parser: Ensure the DDL is parsed correctly both with and without the
FRESHNESSclause.Planner/Execution:
Verify that a materialized table created without a
FRESHNESSclause correctly defaults to a 3-minute interval and results in aCONTINUOUSrefresh job.Verify that a materialized table created with an explicit
FRESHNESSclause continues to function correctly.
End-to-End Tests: Add tests to validate the user journeys described above.
7. Documentation
The official Flink SQL documentation will be updated to:
Reflect that the
FRESHNESSclause is now optional.Explain the default behavior.
Include examples demonstrating the simplified syntax.
8. Rejected Alternatives
8.1. Using a Global Configuration Option
We considered a simpler design where the default was controlled only by a global flink-conf.yaml setting and using this value in the current logic in the parser. The introduction of the MaterializedTableEnricher interface would be obsolete.
This was rejected because it is not extensible. While a configuration option provides a necessary baseline, it does not allow for the "smart" context-aware logic that is a key motivation for this FLIP (e.g., inheriting freshness from upstream tables). The MaterializedTableEnricher provides a much cleaner and more powerful extension point for custom logic, keeping the default mechanism within the API while still allowing for a configurable baseline. The chosen hybrid approach provides the best of both worlds