Date: Tue, 19 Mar 2024 12:19:24 +0000 (UTC) Message-ID: <104371792.56641.1710850764903@cwiki-he-fi.apache.org> Subject: Exported From Confluence MIME-Version: 1.0 Content-Type: multipart/related; boundary="----=_Part_56640_82362844.1710850764902" ------=_Part_56640_82362844.1710850764902 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Content-Location: file:///C:/exported.html
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on= the wiki (wiki discussions get unwieldy fast).
KIP-253 ensures that messages with the same key from the same producer c= an always be delivered in the order that they are produced. However, for st= ateful stream processing jobs, after we change the number of partitions or = the number of processors, the messages with the same key may go to a differ= ent processor that does not have the state necessary to process this messag= e. This may cause stateful stream processing job to output wrong result.
In this KIP we propose a solution that allows user to expand number of p= artitions of input topic as well as the number of processors of stateful pr= ocessing jobs while still ensuring output correctness.
Note: In this KIP, we use the terminology "processor" t= o indicate a combination of consumer, local state store and user-specified = processing logic.
In addition to ensuring correctness, we also aim to achieve the followin= g goals to optimize the efficiency of this KIP.
1) When we expand partitions of input topics, we should not change the k= ey -> processor mapping, so that no state has to be moved between the ex= isting processors. This in turn would guarantee correctness because KIP-253= ensures that messages with the same key can be delivered in the order that= they are produced.
2) When we expand processors of a given stateful processing job, some ke= y will be assigned to the newly-added processors. And it would be necessary= to copy the state for the corresponding keys to these processors before th= ese processors can consume messages and generate output. However, we should= not have to copy state between existing processors.
3) In order to keep processing new messages in near real-time when we ar= e copying state to the newly-added processor, the newly-added processor sho= uld only start to consume "new" messages and generate output "after" its lo= cal state is caught up. Before its local state is caught up, the existing p= rocessor should continue to process new messages and generate output.
4) In order to avoid disrupting existing use-case and keep backward comp= atibility, the 3-tuple (topic, partition, offset) should uniquely identify = the same message before and after the expansion. And a given message should= be consumed exactly once by bootstrapping consumers after the partition ex= pansion. This goal suggests that we should not delete existing message or c= opy any message in the topic.
TODO
Here we describe how to support partition expansion while keeping the sa= me key -> processor mapping.
Here are the requirements. We skip the detail for now (e.g. how to remem= ber the initial number of consumers in the consumer group).
1) Stateful processing job starts wit= h the same number of processors as the initial number of partitions of the = input topics
2) At any given time the number of pa=
rtitions of the input topic >=3D the number of processors of the process=
ing job. In other words, we always expand partitions of the input topic bef=
ore expanding processors of the processing job.
3) The processing job remembers the i= nitial number of processors in the job. This can be supported in core Kafka= by remembering the initial number of consumers in the consumer group.
4) The partition -> process= or mapping is determined using linear hashing. This is where we need the in= itial number of processors in the job.
It can be proven that the above requirements would keep= the same key -> processor mapping regardless of how we change the parti= tion number of the input topics, as long as we don't change the number of p= rocessors of the processing job.
We skip the proof here. Reader can thinking about how the key -> proc= essor mapping changes in the following example:
- Input topic initially has 10 partitions and processing job initia= lly has 10 processors.
- Expand partition of the input topic to 15.
- Expand processors of the processing job to 15.
- Expand partition of the input topic to 18.
Here we provide the high level idea of how this can be supported. Most o= f the logic is expected to be implemented in the stream processing layer.= p>
Note: It is required that, at any given time, the number of partitions of the input topic >= =3D the number of processors of the processing job. In other words, we shou= ld always expand partitions of the input topic before expanding processors = of the processing job.
When a new processor is added to the processing job, the processor shoul= d be able to know the following information:
- The set S1 of partitions that the new processor should be consuming af= ter its local state is caught up.
- The latest offsets T1 of partitions in the set S1 that have been consu= med by the existing processors in the job. This information should be perio= dically retrieved over time.
- The set S2 of partitions that may contain messages with the same key a= s the messages in S1. S2 includes all partitions in S1. Set S2 can be deter= mined using the logic added in KIP-253.
- The latest offsets T2 of partitions in the set S2 that has been used t= o generate the local state store.
Initally offsets T2 will be 0 for all partitions in the set S2 because t= he local state store is empty. Offsets T1 will keep growing as the existing= processors keep consuming messages from partitions in the set S1. The new = processor should keep reading messages from partitions in S2 to re-build lo= cal state until offsets T2 have "almost" caught up with offsets T1 of the e= xisting processors for all partitions in S1. Note that this follows the des= ign in KIP-253 such that, before any message can be read by the new process= or from the newly-added partitions in S1, all prior messages with the same = key (as any message in partitions S1) must have been consumed in order.
Also note that, while the new processor is catching up, the existing pro= cesor will still be consuming all new messages of all partitions of the inp= ut topic and no rebalance has been triggered yet. And new processor uses th= e messages to re-build local state without generating any output.
After offsets T2 have "almost" caught up with offsets T1, we should trig= ger rebalance to re-assign the partitions in S1 from the existing procesor = to the new processor. The existing processor should commit offset, generate= output and stop consuming messages from partitions in S1. The new processo= r should first consume messages from offset T2 to offset T1 to finish re-bu= ilding the local state. Then it can consume messages starting from offsets = T1 and genreate output for these messages.
TODO
TODO