I am facing a condition in a Spring Boot application where Update events are sometimes processed before Create events, even though both messages are produced with the same partitioning key.
The Setup:
Topic: 10 partitions.
Listener: A Batch Listener (List
Factory Config: Using ConcurrentKafkaListenerContainerFactory with a custom AsyncTaskExecutor (Core Pool Size: 2+).
Concurrency: No explicit setConcurrency is defined.
My Theory:
I suspected that the Kafka Poller thread was fetching a batch containing the Create event, handing it off to the AsyncTaskExecutor, and then immediately polling the next batch containing the Update event and handing that off to a different thread in the pool, and the thread responsible for doing the update work finishes earlier than the thread responsible for create event.
The Debugging Attempt:
To simulate a slow "Create" process, I added Thread.sleep(15000) to the logic that handles the Create event. I expected the AsyncTaskExecutor to pick up the next batch (the Update) on its second available thread.
The Result:
Surprisingly, the application stopped consuming for those 15 sec. The Update event was not picked up by the second thread; instead, the entire listener seemed to wait for the first thread to finish its sleep. And once it was done, the first thread consumed the update event as well.
Questions:
Why did this happen? Why second thread didn’t process the update record?
What is the fundamental difference between setConcurrency(10) vs. using an AsyncTaskExecutor?
What should be the next steps for me?
spring kafka consumer config code
@Bean
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMsConig);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, hartbeatIntervalMs);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, auditConsumerFetchMaxWaitMs);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, auditConsumerFetchMinBytes);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(SMSEntity.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryAudit() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setListenerTaskExecutor(getTaskExecutor1());
return factory;
}
@KafkaListener(clientIdPrefix = "${audit.topic}", id = "auditIdLobby", topics = "${topic.name}", groupId = "${kgroup.id}", containerFactory = "kafkaListenerContainerFactoryAudit")
public void auditEntryListener(@Payload List payload,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition, @Header(KafkaHeaders.OFFSET) String offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) String messageKey) {
// processing logic
// db calls
}