Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

SubscriptionInfoData protocol will get a version bump as well as the new field that will encode rack ID of the Kafka Streams instance. Encoded rack ID will later be used by RackStandbyTaskAssignor RackAwareStandbyTaskAssignor implementation to distribute standby tasks over different racks (which is called by TaskAssignor implementation).

...

rack.standby.task.assignor - Class that implements RackStandbyTaskAssignor RackAwareStandbyTaskAssignor interface (see below)

Code Block
languagejava
    public static final String RACK_ID_CONFIG = "rack.id";
    private static final String RACK_ID_DOC = "An identifier for the rack of an instance of Kafka Streams. " +
                                              "When set, the default implementation of org.apache.kafka.streams.StandbyTaskDistributor of the Kafka Streams " +
                                              "will try to distribute standby tasks in different rack compared to corresponding active task.";

    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_CONFIG = "rack.standby.task.assignor";
    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_DOC = "Class that implements RackStandbyTaskAssignorRackAwareStandbyTaskAssignor interface, " +
                                                                      "that is used when deciding on which racks standby tasks can be created.";

New

...

RackAwareStandbyTaskAssignor interface

In order to give users flexibility over controlling on which racks standby task can be allocated, we will introduce new public interface that can be implemented by Kafka Stream's user when default implementation is not enough.

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;

import java.util.Map;
import java.util.Set;

public interface RackStandbyTaskAssignorRackAwareStandbyTaskAssignor {

    /**
     * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
     * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
     * @param clientRackIds - Client rack IDs that were received during assignment.
     * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
     *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
     */
    Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                      final Set<String> clientRackIds);
}

...

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class DefaultRackStandbyTaskAssignorDefaultRackAwareStandbyTaskAssignor implements RackStandbyTaskAssignorRackAwareStandbyTaskAssignor {

    @Override
    public Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                             final Set<String> clientRackIds) {
        final Map<String, Set<TaskId>> standbyTaskDistribution = new HashMap<>();
        for (final Map.Entry<TaskId, String> sourceTaskEntry : sourceTasks.entrySet()) {
            final TaskId taskId = sourceTaskEntry.getKey();
            final String taskRackId = sourceTaskEntry.getValue();

            for (final String clientRackId : clientRackIds) {
                if (!taskRackId.equals(clientRackId)) {
                    final Set<TaskId> rackTasks = standbyTaskDistribution.getOrDefault(clientRackId, new HashSet<>());
                    rackTasks.add(taskId);
                    standbyTaskDistribution.put(clientRackId, rackTasks);
                } else {
                    standbyTaskDistribution.put(clientRackId, new HashSet<>());
                }
            }
        }

        return Collections.unmodifiableMap(standbyTaskDistribution);
    }
}

...

  1. Allowing users to specify rack ID of the Kafka Streams instance by settings rack.id streams config.
  2. Configured rack.id will be encoded in SubscriptionInfoData payload.
  3. StreamsPartitionAssignor#assign will assign received rack.id to org.apache.kafka.streams.processor.internals.assignment.ClientState and then all the client states will be passed on to TaskAssignor implementation.
  4. TaskAssignor#assign when assigning standby tasks to clients, will invoke configured RackStandbyTaskAssignor RackAwareStandbyTaskAssignor class that returns desired RackID => Set(TaskId... ) distribution..

...