Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Similarly to Kafka Broker, Kafka Streams instances can also be grouped in different racks. When Kafka Stream's standby task is properly distributed in different rack compared to corresponding active task, it provides fault tolerance and faster recovery time if the rack of the active task goes down.

As of now, there's no possibility to ensure that standby tasks are distributed in different Kafka Streams racks.

...

Moreover, It would be desirable to allow custom logic for

...

the standby task distribution in cases when there're complex identification of rack IDs and some additional parsing and processing is needed before deciding the desired standby task distribution over different racks.

This KIP aims to provide Kafka Streams users the ability to assign rack ID to each Kafka Streams instance and use that information to assign the standby tasks in different racks (by implementing custom logic if necessary) 

Public Interfaces

This KIP introduces new configuration options in StreamsConfig, changes in SubscriptionInfoData and introducing new public interface standby task assignment over different racks.

Changes in SubscriptionInfoData protocol

SubscriptionInfoData protocol will get a version bump as well as the new field that will encode rack ID of the Kafka Streams instance. Encoded rack ID will later be used by RackStandbyTaskAssignor implementation to distribute standby tasks over different racks (which is called by TaskAssignor implementation).

Changes in StreamsConfig

In order to give Kafka Stream's user the ability to define their desired standby task distribution, we need to add two new configuration options to StreamsConfig.

rack.id - configuration that assigns rack identification to Kafka Streams

rack.standby.task.assignor - Class that implements RackStandbyTaskAssignor interface (see below)

Code Block
languagejava
    public static final String RACK_ID_CONFIG = "rack.id";
    private static final String RACK_ID_DOC = "An identifier for the rack of an instance of Kafka Streams. " +
                                              "When set, the default implementation of org.apache.kafka.streams.StandbyTaskDistributor of the Kafka Streams " +
                                              "will try to distribute standby tasks in different rack compared to corresponding active task.";

    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_CONFIG = "rack.standby.task.assignor";
    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_DOC = "Class that implements RackStandbyTaskAssignor interface, " +
                                                                      "that is used when deciding on which racks standby tasks can be created.";

New RackStandbyTaskAssignor interface

In order to give users flexibility over controlling on which racks standby task can be allocated, we will introduce new public interface that can be implemented by Kafka Stream's user when default implementation is not enough.

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;

import java.util.Map;
import java.util.Set;

public interface RackStandbyTaskAss
ignor {

    /**
     * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
     * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
     * @param clientRackIds - Client rack IDs that were received during assignment.
     * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
     *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
     */
    Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                      final Set<String> clientRackIds);
}


The default implementation of the interface will make sure that standby tasks for sourceTasks are assigned in different racks. Example:

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class DefaultRackStandbyTaskAssignor implements RackStandbyTaskAssignor {

    @Override
    public Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                             final Set<String> clientRackIds) {
        final Map<String, Set<TaskId>> standbyTaskDistribution = new HashMap<>();
        for (final Map.Entry<TaskId, String> sourceTaskEntry : sourceTasks.entrySet()) {
            final TaskId taskId = sourceTaskEntry.getKey();
            final String taskRackId = sourceTaskEntry.getValue();

            for (final String clientRackId : clientRackIds) {
                if (!taskRackId.equals(clientRackId)) {
                    final Set<TaskId> rackTasks = standbyTaskDistribution.getOrDefault(clientRackId, new HashSet<>());
                    rackTasks.add(taskId);
                    standbyTaskDistribution.put(clientRackId, rackTasks);
                } else {
                    standbyTaskDistribution.put(clientRackId, new HashSet<>());
                }
            }
        }

        return Collections.unmodifiableMap(standbyTaskDistribution);
    }
}

 

Proposed Changes

As described in Public Interfaces section, proposal consists of:

  1. Allowing users to specify rack ID of the Kafka Streams instance by settings rack.id streams config.
  2. Configured rack.id will be encoded in SubscriptionInfoData payload.
  3. StreamsPartitionAssignor#assign will assign received rack.id to org.apache.kafka.streams.processor.internals.assignment.ClientState and then all the client states will be passed on to TaskAssignor implementation.
  4. TaskAssignor#assign when assigning standby tasks to clients, will invoke configured RackStandbyTaskAssignor class that returns desired RackID => Set(TaskId... ) distribution..

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

N/A

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.