Дефолтный процессор после отката без ожидания для пакетного слушателя

Вопрос или проблема

Я пытаюсь настроить обработку ошибок для слушателя батчей таким образом, чтобы, если одна запись не удается обработать, весь батч отправлялся в топик мёртвых писем (с некоторым дополнительным логированием). Я использую транзакции.

Вот конфигурация моего слушателя:

    @Transactional("datasourceTransactionManager")
    @KafkaListener(
            id = "myId",
            idIsGroup = false,
            topics = "commandTopic",
            containerFactory = "containerFactoryDlqErrorHandling",
            batch = "true"
    )
    @SendTo("replyTopic")
    public List<Message<V>> listen(List<ConsumerRecord<String, String>> records) {

        ...
    }

А конфигурация containerFactoryDlqErrorHandling выглядит следующим образом:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> containerFactoryDlqErrorHandling(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            KafkaProperties kafkaProperties,
            DeadletterRecoverer defaultRecoverer,
            KafkaTemplate<?, ?> kafkaTemplate,
            BackOff backOff) {

        var customContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(customContainerFactory,
                             new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
        var defaultErrorHandler = new DefaultAfterRollbackProcessor<>(
                defaultRecoverer,
                backOff,
                kafkaTemplate,
                true);
        defaultErrorHandler.defaultFalse();
        defaultErrorHandler.addRetryableExceptions(DataAccessResourceFailureException.class);
        defaultErrorHandler.addRetryableExceptions(TransactionException.class);
        customContainerFactory.setAfterRollbackProcessor(defaultErrorHandler);
        customContainerFactory.setContainerCustomizer(container -> {
            container.getContainerProperties().setBatchRecoverAfterRollback(true);
        });
        return customContainerFactory;
    }

Это ведет себя почти так, как и ожидалось, за исключением того, что перед перемещением сообщений в топик мёртвых писем DefaultAfterRollbackProcessor всегда повторно пытается обработать их с настроенной задержкой, даже когда выбрасываемое исключение не установлено как повторяемое (например, ListenerExecutionFailedException).

Как мне избежать нежелательных попыток повторной обработки? Нужно ли мне писать свой собственный AfterRollbackProcessor?

Ответ или решение

Для настройки обработчика ошибок в вашем сценарии с использованием Kafka и Spring, чтобы избежать нежелательных повторных попыток обработки сообщений, необходимо немного изменить конфигурацию вашего DefaultAfterRollbackProcessor. Давайте рассмотрим, как это сделать.

Во-первых, важно отметить, что DefaultAfterRollbackProcessor будет пытаться переотправить сообщения, если они не отмечены как окончательно неудачные из-за обрабатываемого исключения. Если вы хотите предотвращать попытки повторной обработки для определённых исключений, таких как ListenerExecutionFailedException, вы можете создать свой собственный AfterRollbackProcessor. Ниже приведен сгруппированный пример, который поможет вам в этом.

1. Создание кастомного AfterRollbackProcessor

Вместо использования DefaultAfterRollbackProcessor, вы можете определить свой собственный класс, реализующий интерфейс AfterRollbackProcessor. Этот класс должен проверять тип ошибки и принимать решение о том, следует ли повторно пытаться обработку сообщения или отправить его в мертвую-letter тему.

import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class CustomAfterRollbackProcessor<K, V> extends DefaultAfterRollbackProcessor<K, V> {

    public CustomAfterRollbackProcessor(DeadletterRecoverer recoverer, KafkaTemplate<K, V> kafkaTemplate) {
        super(recoverer, kafkaTemplate);
    }

    @Override
    public void process(List<ConsumerRecord<K, V>> records, Exception thrownException) {
        // Здесь вы можете проверять тип исключения
        if (thrownException instanceof ListenerExecutionFailedException) {
            // Перенаправляем сообщения в мертвую-letter тему
            this.getRecoverer().accept(records);
            return; // Выходим из метода, чтобы избежать повторных попыток
        }
        // В противном случае продолжаем с обычной логикой
        super.process(records, thrownException);
    }
}

2. Обновление конфигурации контейнера

Теперь, когда у вас есть собственный AfterRollbackProcessor, вам нужно обновить вашу конфигурацию контейнера для использования этого нового класса.

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> containerFactoryDlqErrorHandling(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        KafkaProperties kafkaProperties,
        DeadletterRecoverer defaultRecoverer,
        KafkaTemplate<?, ?> kafkaTemplate) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> customContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(customContainerFactory,
                         new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));

    // Используем кастомный процессор
    CustomAfterRollbackProcessor<?, ?> customAfterRollbackProcessor = 
            new CustomAfterRollbackProcessor<>(defaultRecoverer, kafkaTemplate);

    customContainerFactory.setAfterRollbackProcessor(customAfterRollbackProcessor);
    customContainerFactory.setContainerCustomizer(container -> {
        container.getContainerProperties().setBatchRecoverAfterRollback(true);
    });

    return customContainerFactory;
}

3. Ключевые моменты

  • Убедитесь, что ваш CustomAfterRollbackProcessor правильно обрабатывает различные исключения и отправляет записи в мертвую-letter тему.
  • Вы можете добавить дополнительные условия в метод process, чтобы настроить логику обработки согласно вашим требованиям.
  • Не забудьте протестировать вашу конфигурацию, чтобы убедиться, что сообщения обрабатываются и перемещаются в мертвую-letter тему согласно вашим ожиданиям.

С помощью этих шагов вы сможете контролировать, когда и как обрабатываются сообщения, и избежать нежелательных повторных попыток обработки в случае возникновения определенных исключений.

Оцените материал
Добавить комментарий

Капча загружается...