Status

Current state: Under discussion

Discussion thread: here

JIRA: here

Abstract

This KIP introduces a new REST API tailored for MirrorMaker2 operational requirements. Tries to make the interaction with MirrorMaker2 easier by providing a new abstraction layer on top of Connect, that can hide the implementation details, and avoid managing the connectors directly.

This proposal introduces a set of REST endpoints for MirrorMaker2 to expose operational status and control information for replication flows. The design greatly reduce the amount of endpoints compared to the Connect REST API, and keep only the essential ones. Built on top of the Connect functionality, this KIP introduces new endpoints built specifically for MirrorMaker2 while applying the same BasicAuthSecurityRestExtension based on JAAS.
With these endpoints, operators and tools can query the state of MirrorMaker2 deployments, enabling easier monitoring.

Motivation

KIP-710: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters introduced the REST API for fencing and task configuration update. Since then, the MirrorMaker2 lacks a dedicated REST interface to expose detailed status (e.g. flows, connectors, tasks, and replicated topics). Administrators must rely on log parsing or indirect metrics, which can complicate troubleshooting and integration with monitoring systems. By exposing endpoints, users will have a consistent, secure API to:

  • Monitor replication flows: View whether flows (e.g. A→B, B→A) are in state of replicating, or disabled...
  • Access replicated topics: A crucial detail in replication flows is that, which topics are actually replicated after applying the topic filtering and enable the replication.
  • Pause/Resume replication flows
  • Access, and manipulate source offsets
  • Access connector and task status: Inspect state (e.g., RUNNING, PAUSED, UNASSIGNED,...) and counts of running tasks.
  • Access an admin API 

This KIP aims to improve observability and operational control of MirrorMaker2 without requiring major architectural changes.

Public Interfaces

The proposal introduces new REST endpoints, described below. All endpoints will be secured (can be secured) with the BasicAuthSecurityRestExtension using JAAS configuration, consistent with Kafka Connect.

HATEOAS

Introducing HATEOAS means that our REST responses won’t just return data but will also include navigational links—such as links to related resources or available actions. In practice, each JSON response will embed URLs (e.g. 'herder', 'connectors') that guide clients on how to navigate or perform subsequent operations dynamically. This approach greatly improves API discoverability

Overview

GET /

Returns overall status, including replication flows and cluster mappings

{
    "flows": {
        "B->A": {
            "state": "DISABLED",
            "detailedState": {
                "enabled": false,
                "ready": true,
                "allConnectorsRunning": true,
                "sourceTaskRunning": false
            },
            "source": "B",
            "target": "A",
            "kafkaClusterId": "f8135ef31353484a9dbbew",
            "links": {
                "herder [GET]": "/B/A"
            }
        },
        "A->B": {
            "state": "REPLICATING",
            "detailedState": {
                "enabled": true,
                "ready": true,
                "allConnectorsRunning": true,
                "sourceTaskRunning": true
            },
            "source": "A",
            "target": "B",
            "kafkaClusterId": "48135ef31353484a9dbbew",
            "links": {
                "herder [GET]": "/A/B"
            }
        }
    },
    "clusters": {
        "A": "localhost:9092",
        "B": "localhost:6092"
    }
}

state

state is a derived value calculated based on the state of the connectors, source tasks and the enabled flag

Possible values are:

  • DISABLED
  • REPLICATING
  • CONCERNING
  • NOT_REPLICATING

Calculated the following way:

if (!enabled) {
 	return MirrorFlowState.DISABLED;
}
if (allConnectorsRunning && hasRunnnigSourceTasks) {
    return MirrorFlowState.REPLICATING;
} else if (hasRunnnigSourceTasks && !allConnectorsRunning) {
    return MirrorFlowState.CONCERNING;
} else {
    return MirrorFlowState.NOT_REPLICATING;
}


Flow specific endpoints {Source}/{Target}

GET /{Source}/{Target}

Returns detailed status for the replication flow from cluster A to B:

{
    "state": "REPLICATING",
    "replicatedTopics": [
        "heartbeats",
        "sensor_data"
    ],      
    "detailedState": {
        "enabled": true,
        "ready": true,
        "allConnectorsRunning": true,
        "sourceTaskRunning": true
    },
    "source": "A",
    "target": "B",
    "kafkaClusterId": "48135ef31353484a9dbbew",
    "leaderWorker": "http://localhost:4444/",
    "MirrorSourceConnector": "RUNNING",
    "MirrorSourceConnectorTasksRunning": 1,
    "MirrorHeartbeatConnector": "RUNNING",
    "MirrorHeartbeatConnectorTasksRunning": 1,
    "MirrorCheckpointConnector": "RUNNING",
    "MirrorCheckpointConnectorTasksRunning": 0,
    "links": {
        "connectors [GET]": "/A/B/connectors",
        "pause [PUT]": "/A/B/pause",
        "resume [PUT]": "/A/B/resume",
        "offsets [GET]": "/A/B/offsets",
        "offsets modification [PATCH]": "/A/B/offsets",
        "offsets reset [DELETE]": "/A/B/offsets"
    }
}

Pause/Resume replication flows

PUT /{Source}/{Target}/pause

Pause the replication flow by pause the underlying connectors.

PUT /{Source}/{Target}/resume

Resume the replication flow by resume the underlying connectors.

GET /{Source}/{Target}/connectors?expand=info&expand=status

A bit going against the idea to abstract away the connect based internals, but this could be useful for debugging, and since this is a readonly endpoint, it should not cause any issues

{
    "MirrorSourceConnector": {
        "info": {
            "name": "MirrorSourceConnector",
            "config": {
                "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
                "replication.factor": "1",
                "offset-syncs.topic.replication.factor": "1",
                "target.consumer.bootstrap.servers": "localhost:6092",
                "topics": ".*",
                "heartbeats.topic.replication.factor": "1",
                "source.cluster.alias": "A",
                "source.consumer.bootstrap.servers": "localhost:9092",
                "source.cluster.bootstrap.servers": "localhost:9092",
                "enabled": "True",
                "target.cluster.alias": "B",
                "name": "MirrorSourceConnector",
                "source.admin.bootstrap.servers": "localhost:9092",
                "target.cluster.bootstrap.servers": "localhost:6092",
                "source.producer.bootstrap.servers": "localhost:9092",
                "target.admin.bootstrap.servers": "localhost:6092",
                "target.producer.bootstrap.servers": "localhost:6092",
                "checkpoints.topic.replication.factor": "1"
            },
            "tasks": [
                {
                    "connector": "MirrorSourceConnector",
                    "task": 0
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "MirrorSourceConnector",
            "connector": {
                "state": "RUNNING",
                "worker_id": "localhost:4446/A->B"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "localhost:4444/A->B"
                }
            ],
            "type": "source"
        }
    },
....


Offset manipulation

Same as we have in connect, but instead of applying directly on the connector, we are applying it to the flow, and under the hood redirecting the request to the MirrorSourceConnector

GET /{Source}/{Target}/offsets

Get the offsets from the MirrorSourceConnector

{
    "offsets": [
        {
            "partition": {
                "cluster": "A",
                "partition": 0,
                "topic": "sensor_readings"
            },
            "offset": {
                "offset": 499999
            }
        },
        {
            "partition": {
                "cluster": "A",
                "partition": 0,
                "topic": "heartbeats"
            },
            "offset": {
                "offset": 127867
            }
        }
    ]
}

DELETE /{Source}/{Target}/offsets

Reset the offsets in the MirrorSourceConnector

PATCH /{Source}/{Target}/offsets

Modify the offsets in the MirrorSourceConnector

Admin API

Use the same LoggingResource  as Connect does to enable users to reconfigure the loggers.


Proposed Changes

The main idea is that we provide a high level overview on the root endpoint about the configured flows and their states. The user can go deeper and deeper to reveal more detail about the individual flows at first, and then if required, about the connectors/tasks.

Security

The API can be secured by the same BasicAuthSecurityRestExtension based on JAAS as Kafka Connect use, to secure their public endpoints.

Compatibility, Deprecation, and Migration Plan

  • Backward Compatibility:
    The new REST endpoints are additive and do not affect existing MirrorMaker2 functionality.

  • Migration Plan:
    No migration process required.

  • Deprecation:
    There is no deprecation of existing APIs, this is an entirely new feature set.

Test Plan

  • Unit Tests:
    Develop tests to validate correct JSON responses from each endpoint given various MirrorMaker2 states.

  • Integration Tests:
    Set up a test cluster with MirrorMaker2 configured with REST endpoints enabled. Verify that the endpoints are accessible via HTTP(s) and that the security (BasicAuth) works as expected.

Rejected Alternatives

Expose the same API as connect: This option was rejected during the discussion of the KIP-710, because it exposes too much of the internals, leading to increased operational complexity and usability, security problems.

  • No labels