Вопрос или проблема
Я пытаюсь настроить обработку ошибок для слушателя батчей таким образом, чтобы, если одна запись не удается обработать, весь батч отправлялся в топик мёртвых писем (с некоторым дополнительным логированием). Я использую транзакции.
Вот конфигурация моего слушателя:
@Transactional("datasourceTransactionManager")
@KafkaListener(
id = "myId",
idIsGroup = false,
topics = "commandTopic",
containerFactory = "containerFactoryDlqErrorHandling",
batch = "true"
)
@SendTo("replyTopic")
public List<Message<V>> listen(List<ConsumerRecord<String, String>> records) {
...
}
А конфигурация containerFactoryDlqErrorHandling
выглядит следующим образом:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> containerFactoryDlqErrorHandling(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaProperties kafkaProperties,
DeadletterRecoverer defaultRecoverer,
KafkaTemplate<?, ?> kafkaTemplate,
BackOff backOff) {
var customContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(customContainerFactory,
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
var defaultErrorHandler = new DefaultAfterRollbackProcessor<>(
defaultRecoverer,
backOff,
kafkaTemplate,
true);
defaultErrorHandler.defaultFalse();
defaultErrorHandler.addRetryableExceptions(DataAccessResourceFailureException.class);
defaultErrorHandler.addRetryableExceptions(TransactionException.class);
customContainerFactory.setAfterRollbackProcessor(defaultErrorHandler);
customContainerFactory.setContainerCustomizer(container -> {
container.getContainerProperties().setBatchRecoverAfterRollback(true);
});
return customContainerFactory;
}
Это ведет себя почти так, как и ожидалось, за исключением того, что перед перемещением сообщений в топик мёртвых писем DefaultAfterRollbackProcessor
всегда повторно пытается обработать их с настроенной задержкой, даже когда выбрасываемое исключение не установлено как повторяемое (например, ListenerExecutionFailedException
).
Как мне избежать нежелательных попыток повторной обработки? Нужно ли мне писать свой собственный AfterRollbackProcessor
?
Ответ или решение
Для настройки обработчика ошибок в вашем сценарии с использованием Kafka и Spring, чтобы избежать нежелательных повторных попыток обработки сообщений, необходимо немного изменить конфигурацию вашего DefaultAfterRollbackProcessor
. Давайте рассмотрим, как это сделать.
Во-первых, важно отметить, что DefaultAfterRollbackProcessor
будет пытаться переотправить сообщения, если они не отмечены как окончательно неудачные из-за обрабатываемого исключения. Если вы хотите предотвращать попытки повторной обработки для определённых исключений, таких как ListenerExecutionFailedException
, вы можете создать свой собственный AfterRollbackProcessor
. Ниже приведен сгруппированный пример, который поможет вам в этом.
1. Создание кастомного AfterRollbackProcessor
Вместо использования DefaultAfterRollbackProcessor
, вы можете определить свой собственный класс, реализующий интерфейс AfterRollbackProcessor
. Этот класс должен проверять тип ошибки и принимать решение о том, следует ли повторно пытаться обработку сообщения или отправить его в мертвую-letter тему.
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class CustomAfterRollbackProcessor<K, V> extends DefaultAfterRollbackProcessor<K, V> {
public CustomAfterRollbackProcessor(DeadletterRecoverer recoverer, KafkaTemplate<K, V> kafkaTemplate) {
super(recoverer, kafkaTemplate);
}
@Override
public void process(List<ConsumerRecord<K, V>> records, Exception thrownException) {
// Здесь вы можете проверять тип исключения
if (thrownException instanceof ListenerExecutionFailedException) {
// Перенаправляем сообщения в мертвую-letter тему
this.getRecoverer().accept(records);
return; // Выходим из метода, чтобы избежать повторных попыток
}
// В противном случае продолжаем с обычной логикой
super.process(records, thrownException);
}
}
2. Обновление конфигурации контейнера
Теперь, когда у вас есть собственный AfterRollbackProcessor
, вам нужно обновить вашу конфигурацию контейнера для использования этого нового класса.
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> containerFactoryDlqErrorHandling(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaProperties kafkaProperties,
DeadletterRecoverer defaultRecoverer,
KafkaTemplate<?, ?> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> customContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(customContainerFactory,
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
// Используем кастомный процессор
CustomAfterRollbackProcessor<?, ?> customAfterRollbackProcessor =
new CustomAfterRollbackProcessor<>(defaultRecoverer, kafkaTemplate);
customContainerFactory.setAfterRollbackProcessor(customAfterRollbackProcessor);
customContainerFactory.setContainerCustomizer(container -> {
container.getContainerProperties().setBatchRecoverAfterRollback(true);
});
return customContainerFactory;
}
3. Ключевые моменты
- Убедитесь, что ваш
CustomAfterRollbackProcessor
правильно обрабатывает различные исключения и отправляет записи в мертвую-letter тему. - Вы можете добавить дополнительные условия в метод
process
, чтобы настроить логику обработки согласно вашим требованиям. - Не забудьте протестировать вашу конфигурацию, чтобы убедиться, что сообщения обрабатываются и перемещаются в мертвую-letter тему согласно вашим ожиданиям.
С помощью этих шагов вы сможете контролировать, когда и как обрабатываются сообщения, и избежать нежелательных повторных попыток обработки в случае возникновения определенных исключений.