new StreamTask(Producer)
- initStateStores()
- if EoS: Producer.initTransactions()
- initTopology()
task.resume()
- if EoS: Producer.beginTransaction()
- initTopology()
task.close()
- suspend()
- closeStateManager()
- if (clean) write checkpoint file
(release state locks)
- if EoS: Producer.close()
ConsumerRebalanceListener.onPartitionsAssign()
- revoked all non-assigned suspended tasks
- close task
- resume reassigned tasks
- create newly assigned tasks
if EoS: create new Producer per task
else: share single Producer over all tasks
ConsumerRebalanceListener.onPartitionsRevoke()
suspend all tasks
- suspend()
task.suspend()
- closeTopology()
- for each node: node.close()
- commit()
Create StreamThread
while(running)
poll()
// close() or on error
shutdown()
for each task
for each task
for each task
StreamThred.shutdown()
close all tasks
- close()
if (!running)
or
on error
for each task
task.commit()
- StateStoreManager.flush()
- RecordCollector.flush() -> Producer.flush()
- if (!eos) write checkpoint file
- commitOffsets:
- if EoS: Producer.sendOffsetsToTransaction()
Producer.commit()
[Producer.beginTransaction(): only if called from loop]
else Consumer.commit()
new StandbyTask()
- initStateStores()
task.resume()
- updateOffsetLimits()
initTopology()
- for each node: node.init()
initStateStores()
- updateOffsetLimits()
- get state locks
- maybe state recovery
task.commit()
- StateStoreManager.flush()
- if (!eos) write checkpoint file
- updateOffsetsLimits()
task.close()
- suspend()
- closeStateManager()
- if (clean) write checkpoint file
(release state locks)
task.suspend()
- task.commit()