Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to fix these issues, we should introduce a new RPC named incrementalAlterConfigs.  The new RPC should operate incrementally, modifying only the configuration values that are specified.  We should deprecate AlterConfigs.

Proposed Changes

The new IncrementalAlterConfigs API in AdminClient will take a collection of operations describing the configuration modifications.  There are four types of operations.

  • Set: set a configuration to a value.  The value must not be null.
  • Delete: delete a configuration key
  • Append: if a configuration key is a list of values, add to the list.
  • Subtract: if a configuration key is a list of values, subtract from the list.

Public Interfaces


Code Block
titlemodifyConfigsIncremental AlterConfigs
@InterfaceStability.Evolving
public class AlterConfigOp {

    public enum OpType {
        SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3);

        private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap(
                Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity()))
        );

        private final byte id;

        OpType(final byte id) {
            this.id = id;
        }

        public byte id() {
            return id;
        }

        public static OpType forId(final byte id) {
            return OP_TYPES.get(id);
        }
    }

    private final ConfigResource configResource;
    private final ConfigEntry configEntry;
    private final OpType opType;

    public AlterConfigOp(ConfigResource configResource, ConfigEntry configEntry,
                         OpType operationType) {
        this.configResource = configResource;
        this.configEntry = configEntry;
        this.opType =  operationType;
    }
}

public IncrementalAlterConfigsResult alterConfigs(
    Collection<AlterConfigOp> ops,
    final IncrementalAlterConfigsOptions options);

If a configuration key is specified multiple times, that is an error, and an error code will be returned for each instance of the configuration key.

Similar to AlterConfigsResult, IncrementalAlterConfigsResult will have a values function which allows callers to determine the result of a specific configuration resource modifications, and an all function which will throw an exception if any operation failed.

Code Block
titleModifyConfigsResult
@InterfaceStability.Evolving
public class IncrementalAlterConfigsResult {
    private final Map<AlterConfigOp, KafkaFuture<Void>> futures;

    IncrementalAlterConfigsResult(Map<AlterConfigOp, KafkaFuture<Void>> futurespublic ConfigEntry configEntry() {
        this.futures = futuresreturn configEntry;
    }

    /**
     * Return a map from resources to futures which can be used to check the status of the operation on each resource.
     */;

    public Map<AlterConfigOp, KafkaFuture<Void>> valuesOpType opType() {
        return futuresopType;
    };
}


public    /**
     * Return a future which succeeds only if all the alter configs operations succeed.AlterConfigsResult incrementalAlterConfigs(
		Map<ConfigResource, Collection<AlterConfigOp>> configs,
     */
   final public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }
}

Similar to AlterConfigsOptions, IncrementalAlterConfigsOptions will include a timeout value and a dry-run flag.

...

titleDeserializer

...

AlterConfigsOptions options);

Proposed Changes

The new IncrementalAlterConfigs API in AdminClient will take a collection of operations describing the configuration modifications.  There are four types of operations.

  • Set: set a configuration to a value.  The value must not be null.
  • Delete: delete a configuration key
  • Append: if a configuration key is a list of values, add to the list.
  • Subtract: if a configuration key is a list of values, subtract from the list.

We will use existing AlterConfigsOptionsAlterConfigsResult API classes to pass the  API config options and to return the result of  configuration resource modifications.

Similar to existing alterConfigs API, we will to keep the "transactionality" of updating several configs  for the same ConfigResource at once. We guarantee that we never do a partial update of a collection of configs
for a ConfigResource from a single request. On validation/update error, we will return the error for the ConfigResource.

Protocol APIs

There will be a new IncrementalAlterConfigsRequest Incremental AlterConfigsRequest.

Code Block
languagejava
titleModifyConfigsRequest
IncrementalAlterConfigsOp => INT8
0: SET
1: REMOVE
2: APPEND
3: SUBTRACT

IncrementalAlterConfigsRequest (Version: 0) => [resources] validate_only
 validate_only => BOOLEAN
 resources => resource_type resource_name [configs]
 resource_type => INT8
 resource_name => STRING
 configs => config_name config_op config_value
 config_name => STRING
 config_op => INT8
 config_value => NULLABLE_STRING

The IncrementalAlterConfigsResponse Incremental AlterConfigsResponse is the same as the AlterConfigsResponse.

Code Block
languagejava
IncrementalAlterConfigsResponse (Version: 0) => [responses]   
  resourcesresponses => resource_type resource_name [configs]
  resource_type => INT8
  resource_name => STRING
  configs => config_name config_op config_value error_code error_message
  configresource_nametype => STRINGINT8
  config_op => INT8
  config_valueresource_name => NULLABLE_STRING
  error_code => INT16
  error_message => NULLABLE_STRING

...

  • If a configuration key was specified more than once in the request for given ConfigResource, we fail ConfigResource update with the INVALID_REQUEST will be returned for all occurrences of the configuration key.
  • If APPEND or SUBTRACT was specified for a configuration key that does not take a list of elements, we fail ConfigResource update with the INVALID_REQUEST will be returned for the configuration key.

Compatibility, Deprecation, and Migration Plan

...