You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current stateUnder Discussion

Discussion thread: Discussion Link 

JIRA: KAFKA-6989 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka Streams, a single thread is used to process tasks which could result in a performance bottleneck. With the current model, under the condition of a fatal error, a new retry topic will be created in which failed records will be sent. Incoming records could not be processed until the failed records in the retry topic are fully reprocessed to guarantee ordering. This could result in a lag because we are essentially backtracking to make sure all data has been sent. New data that would have been processed if a failure had not occurred will have to wait while the thread paused on previous records.

Present Kafka Design Philosophy

Before we dive into how we could change the preexisting codebase, it would be helpful to clarify on some of the details regarding the current Kafka Streams design. Right now, threads are independent of one another–that is, each thread is in charge of its own stream task. There is no cooperation between threads. And as a result, they are not processing the same topic, but instead distinct ones. For example, one thread processes topic1 while another processes topic2. These two threads does not attempt to access the other topic because such knowledge is not given to them. In this manner, the two threads are isolated from one another. 

However, this design could be problematic under certain conditions. When failed records are moved to a retry topic, the current thread has no way of passing it off to another thread and say "here, you could do this work for me so it doesn't hold up the line." So we have some options to fix this issue:

  1. Allow the current thread access to other threads on standby. And when a need for increased processing speed is necessary, we could call upon these reserve threads and then release them once their task is complete so that they could be used elsewhere.
  2. Another option instead is to maintain the isolation of the threads, and use a method that does not block (i.e. use a similar policy to KafkaConsumer#commitAsync).

Public Interfaces

We have a couple of choices available to us as mentioned in the ticket discussion (KAFKA-6989). Recall that in asynchronous methods, we do not need inputs from a previous function to start it. All we need is the signal. However, when offsets have been committed or processed in an asynchronized manner, we will need to determine the behavior that occurs after the conclusion of the process. It would also be wise to remember that asynchronous processing does not necessarily require an extra thread.  When offsets are committed or received, we should consider the following:

  1. Ordering: As noted in the JIRA chat, Samza has judged it to be impossible for the records to be returned in its original sequence by their implementation. 
  2. Exactly-Once: In exactly-once semantics, each offsets are returned once. This will not be possible if multiple threads are active (i.e. more than one thread is calling the same Kafka Streams instance).
  3. Latency and Resilience: Whenever we attempt to retry processing, as mentioned in the JIRA ticket, it could end up as a performance bottleneck because we are effectively "stopping the world" by pausing on a single record. An option to avoid this is to allow a second thread to handle these failed records while we continue to process incoming metadata. However, exactly once and ordering would not be supported under these conditions.

Now we have two ways to approach this issue. Note that in KafkaConsumer, its implementation basically prevents more than one thread from calling it at once. So if we wish to follow a similar methodology with Kafka Streams, we will have the following:

  1. Positive sides: Ordering would still be guaranteed, and exactly once will still be the same. We would add method(s) with behavior similar to KafkaConsumer#commitAsync in that it accepts a callback defined by the user and calls it once it is done. The user's thread will move on while we construct a future that waits on the task's completion. In this manner, any new methods does not block.
  2. Negative sides: As mentioned, failure handling would still be a major problem (the user has to deal with this). We could still end up stuck on one record which for some reason continues to stubbornly fail. It could end up slowing down the whole process. 

So basically, what would this does is that it allows the user to choose between a method that does not block and one that does. The isolation between threads are still maintained. Next, we have the multithreaded approach and we can take an alternative route. Instead of one thread, we could have some more tradeoffs:

  1. Positive sides: Failure handling is better now in that multiple threads are on the job. While a secondary thread takes care of the failed metadata, the primary thread could move on processing new ones. Since the failed metadata topic's workload is not constantly increasing, we will have time to process them. Once the secondary thread has finished with the failed records, it could be terminated, thus freeing up CPU resources and space. Latency would be reduced.
  2. Negative sides: Ordering is now impossible to guarantee, as is exactly-once because we have no way of knowing which records has been returned since asynchronous processes have no way of communicating between one another. .

In the first approach I outlined, we are essentially giving the user some more flexibility in deciding how to resolve the latency and failure handling problem. The second approach takes some load off the client's back in that we figure out how to process the records using multiple threads, and clients doesn't have to worry about anything complex. Note that with the second plan, no CompletableFutures would be involved as secondary threads would be processing it directly using blocking methods (like KafkaConsumer#commitSync). 

We could set the first approach as the default policy, while the second approach (which is the one that Samza uses) would be used only if the client specifies us to use more than one thread per stream task.  

Proposed Changes

For the first plan, we would have some new methods that might look something like this: 

getMetadataAsync
/** Retrieves record metadata for a topic asynchronously. This method does not block.
 *  @param callback will be called when the records has been retrieved or has failed.
 *  @param storeName the name of the store the user is querying.
 * /
void getMetadataAsync(RecordCallback callback, String storeName);


/** Retrieves offsets. If num.threads.per.task >= 1, then it will return offsets out of order.
 *  @param inOrder the expectation for the offsets to be in order.
 *  @param storeName name of store being queried.
 *  @throws IllegalStateException if inorder == true when num.threads.per.task > 1
 */
Collections<StreamsMetadata> processMetadataForStore(boolean inOrder, String storeName);

The purpose of RecordCallback is analogous to KafkaConsumer's OffsetCommitCallback. The user would use this interface (or class?) to define the end behavior of a method.  It could be used to notify the user that they could move on to processing the next record, or it could be used as a feedback loop to send another request for the same record (if it failed). 

In the second plan, we will have to include a new config such as num.threads.per.task to help us ascertain the number of threads, maximum, that could be used per task. These threads would be simultaneously processing from the same task (however, ordering would no longer be guaranteed).

If num.threads.per.task is equal to one, then we go to our default policy. Otherwise, we would go with the second one. Please note that if getMetadataAsnyc was supported for the case where num.threads.per.task is greater than one, than there will be multiple calls to the user-provided callback because the request would be split into multiple asynchronous threads (who does not have any communications with one another). The callback could be seen below:

RecordCallback
public interface RecordCallback {
    /** A method which can be used by the client to define the behavior of the code after the metadata has been retrieved.
	 *  @param exc      An exception if one had occurred 
     *  @param metadata The metadata retrieved (that is if successfully)
     */
	public void onComplete(KafkaException exc, Collection<StreamsMetadata> metadata);
}

Alternatives

There is a possibility where getMetadataAsync could call the given RecordCallback once even though we might be splitting the work into multiple threads. When implementing this change, the most likely approach will be having a primary thread acting as a parent for the other threads (i.e. it decides when threads are terminated and executed). The child threads will not have the original callback, but instead a modified one. These modified callbacks are used to notify the parent thread when they are done processing their respective tasks and once all threads are done processing, then we call the callback to maintain behavior. It really comes down to how it works out in the code, but it is worth consideration.


Compatibility, Deprecation, and Migration Plan

There might be some problems if a new KafkaStreams instance is brought online in which it gives the offsets out of order when the user is expecting it to be in-order. So in order to maintain the old behavior, we will keep the current structure of Kafka Streams methods intact (although some of its statements might have to be tweaked to accommodate for the new change).

There are no considerations for deprecation, particularly since we are adding a new capability, not upgrading a preexisting one. 

However, if the user calls for records expecting it to be in-order and it's not. Then an exception will be thrown notifying the client that the configs are incompatible with their method request.

For example, the original metadataForStore method will call processMetadataForStore(boolean inOrder) with inOrder having a value of true. If the configs indicated that we are processing using multiple threads, then an exception will be thrown. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels