Fineract Events
Apache Fineract Events are triggered on every action which implements the Events Producer, also Apache Fineract can implement Listeners for consuming the messages that arrive to the topics, i.e. the Community Application consumes the messages for presenting the events to the backoffice user, also the message has persistence in the Database.
Apache Fineract Events implements the CQRS pattern for running the read and write Events operations.
The following Events are produced in Kafka in the current implementation:
- ACTIVATE_CLIENT New client created
- APPROVE_LOAN New loan created
- DISBURSE_LOAN New loan approved
- READ_LOAN Loan closed
- READ_Rescheduled Loan has been rescheduled
- READ_LOAN Repayment made
Apache Kafka
Apache Kafka is a messaging service which can be implemented with the following approach:
- Synchronous
- Asynchronous
In Apache Fineract the asynchronous messaging has been implemented for producing the messages in order to avoid any kind of blocking while running under huge workloads.
Persisting the Kafka Messages
Apache Fineract Events are stored in the DB for persisting the events.
The tables are used by the UI (Community Application) to list and retrieve the most recent actions executed by the Apache Fineract.
Architecture - High Level - Online Transactions
Architecture - High Level - Batch Transactions
Events - Transactions - Producer
For enabling the Transactions while producing Kafka Events it is required to implement transaction-aware Producer:
Producer properties:
enable.idempotence: true
transactional.id: prod-1
Because we've enabled idempotence, Kafka will use this transaction id as part of its algorithm to deduplicate any message this producer sends, ensuring idempotency.
Simply put, if the producer accidentally sends the same message to Kafka more than once, these settings enable it to notice.
Events - Transactions - Consumer
For enabling the Transactions while consuming Kafka Events it is required to implement transaction-aware Consumer:
Consumer properties:
enable.auto.commit: false
isolation.level: read_committed
Using a value of read_committed ensures that we don't read any transactional messages before the transaction completes.
Events - Transactions - Producer/Consumer
The proposed properties allows to have the producer and consumer both configured to write and read transactionally:
The Transactional Java Implementation will cover the:
- Sender/Consumer API
- Committing Offsets
- Committing or Aborting the Transaction
Events - Error Handling - Producer
If anything goes wrong while we are processing we catch the exception and can call abortTransaction:
try {
producer.commitTransaction();
} catch ( Exception e )
{
producer.abortTransaction();
}
And drop any buffered messages and remove the transaction from the broker.
If Fineract neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself.
Events - Error Handling
The implementation of the Error Handling can catch the error or rollback and sending the messages to a Dead Letter Queue.
@Bean
public ErrorHandler errorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler) {
//set with retry policy higher than KafkaListenerErrorHandler
return new SeekToCurrentErrorHandler((data, thrownException) -> {
deadLetterQueueHandler.send(data, thrownException);
}, new FixedBackOff(15000, 20));
}
Events - Rollback Processor
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(MyDeadLetterQueueHandler deadLetterQueueHandler) {
//set with retry policy higher than KafkaListenerErrorHandler
final var afterRollbackProcessor = new DefaultAfterRollbackProcessor<Object, Object>(((data, thrownException) -> {
deadLetterQueueHandler.send(data, thrownException);
}, new FixedBackOff(15000, 20));
afterRollbackProcessor.setCommitRecovered(true);
return afterRollbackProcessor;
}
Events - Error Handling
@Primary
KafkaListenerErrorHandler kafkaListenerErrorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler,
MyExceptionHandler exceptionHandler) {
return (message, exception) -> {
final var cause = (Exception) exception.getCause();
final var consumerRecord = message.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
if (shouldGoToDLT(cause)) {
sendToDeadLetterTopic(deadLetterQueueHandler, consumerRecord, cause);
return new CustomResponse(cause.getMessage());
// should end transaction rollback and go to next transaction
} else{//retry logic}
References
Apache Fineract GitHub Repository: https://github.com/apache/fineract
Apache Fineract Release License: https://github.com/apache/fineract/blob/develop/LICENSE_RELEASE
Apache Fineract Source License: https://github.com/apache/fineract/blob/develop/LICENSE_SOURCE
Apache Kafka: https://kafka.apache.org/