Kafka ProducerFencedException при использовании транзакций с Spring Kafka и синхронизацией базы данных

Вопрос или проблема

Я сталкиваюсь с исключением ProducerFencedException при попытке синхронизировать события между Kafka и базой данных PostgreSQL, используя Spring Kafka. События отправляются в Kafka, и база данных обновляется успешно, и я могу видеть события в теме Kafka, когда использую уровень изоляции read_uncommitted. Однако потребитель, пох parece, не потребляет сообщения с уровнем изоляции read_committed.

Конфигурация продюсера:

@Configuration
public class KafkaProducerConfig {
    @Value("${bootstrap.servers}")
    private String boostrapServers;

    public Map<String, Object> producerConfig() {
        Map<String, Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
        return producerConfig;
    }

    @Bean
    public <T> ProducerFactory<String, T> producerFactory() {
        DefaultKafkaProducerFactory<String, T> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfig());
        defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
        return defaultKafkaProducerFactory;
    }

    @Bean
    public <T> KafkaTemplate<String, T> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, Object> transactionEventProducerKafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

Конфигурация потребителя:

isolation.level=read_committed

Логика отправки события продюсером и обновления базы данных:

@Transactional
public void sendMessageToTopics(Map<Long, String> duplicateCampaignId,
                                List<CampaignSMS> campaignSMSList, SmsType smsType) {

    if (smsType == SmsType.BULK) {
        campaignSMSList.forEach(campaignSMS -> {
            sendEvent(smsBulkTopicName, campaignSMS);
        });
    }
    if (smsType == SmsType.PROFILE) {
        campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
    }

    duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
            updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
    customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}

private void sendEvent(String topicName, CampaignSMS campaignSMS) {
    try {
        kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        log.error(e.getMessage());
        Thread.currentThread().interrupt();
        throw new InternalException(e.getMessage());
    } catch (ExecutionException | TimeoutException e) {
        log.error(e.getMessage());
        throw new InternalException(e.getMessage());
    }
}

Ошибка, которую я получаю в логах:

ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.

Эта ошибка возникает при попытке зафиксировать транзакцию в моей запланированной задаче. Похоже, что несколько продюсеров используют один и тот же transactionalId, что приводит к этой проблеме с ограждением.Я также пытался динамически генерировать transactional.id с помощью UUID.randomUUID() для каждого продюсера, но этот подход также не сработал, и ошибка появляется та же.(ProducerConfig.TRANSACTIONAL_ID_CONFIG, “transaction-event-id”+UUID.randomUUID() и defaultKafkaProducerFactory.setTransactionIdPrefix(“transaction-event-id”+UUID.randomUUID());)

Вот мои вопросы:

  1. Как я могу гарантировать, что каждый экземпляр продюсера имеет уникальный
    transactionalId?
  2. Каковы лучшие практики управления транзакциями Kafka при
    синхронизации с базой данных?
  3. Существует ли способ обработать ProducerFencedException плавно в таком
    сценарии?

Любая помощь будет очень признательна! Спасибо!

Ответ или решение

Ошибка ProducerFencedException при использовании транзакций с Spring Kafka и синхронизацией базы данных

Введение

При работе с Apache Kafka и Spring Kafka, особенно в контексте транзакций и синхронизации с базой данных PostgreSQL, возможно столкновение с ошибкой ProducerFencedException. Эта ошибка возникает, когда несколько продюсеров используют один и тот же идентификатор транзакции (transactional.id), что приводит к "ограждению" одного продюсера другим. В данном ответе мы разберем возможные причины возникновения этой ошибки, а также предложим решения и лучшие практики для работы с транзакциями Kafka.

Проблема

Вы указали, что события успешно отправляются в Kafka и обновления в базе данных также выполняются корректно. Однако, когда вы используете уровень изоляции read_committed, ваши консьюмеры не видят сообщений. Ошибка ProducerFencedException, с которой вы сталкиваетесь, указывает на то, что существует конфликт транзакционных идентификаторов между несколькими продюсерами.

Возможные причины проблемы

  1. Несоответствие идентификаторов транзакций: Убедитесь, что все экземпляры продюсера используют уникальный идентификатор транзакций. Ваша попытка динамически создавать transactionalId с помощью UUID.randomUUID() может не сработать, если используется общий ProducerFactory.

  2. Параллельные транзакции: Если вы создаете несколько экземпляров продюсера, у которых одинаковый transactionalId, это также вызывает конфликт.

  3. Производительность и временные задержки: Если операции отправки сообщений слишком быстрые, и транзакции не успевают завершаться, это может привести к тому, что новый продюсер "фендует" предыдущий.

Рекомендации по решению

1. Уникальный transactionalId для каждого экземпляра

Для обеспечения уникальности идентификатора транзакции можно использовать условие, при котором каждый экземпляр настроен с уникальным идентификатором. Например, вы можете использовать UUID или комбинировать с уникальным идентификатором сервиса:

producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id-" + UUID.randomUUID().toString());

2. Управление транзакциями

Лучшие практики для управления транзакциями в Kafka:

  • Разделите логику отправки сообщений и обновления базы данных. Это позволит вам более точно контролировать состояние транзакций и минимизировать количество конфликтов.

  • Используйте аннотацию @Transactional только на высокоуровневых методах, чтобы избежать нежелательного поведения.

  • Настройте подходящую стратегию повторного выполнения, если происходит сбой транзакции, чтобы избежать неполных или испорченных состояний.

3. Управление ProducerFencedException

Для обработки ProducerFencedException:

  • Реализуйте механизм повторной попытки: При возникновении исключения, вы можете повторить операцию, используя новый producer, настроенный с уникальным transactionalId.

  • Логируйте ошибки и временные метки, чтобы вы могли диагностировать и понимать, как и когда происходят столкновения.

Заключение

Ошибка ProducerFencedException в контексте использования Kafka транзакций и синхронизации с базой данных может быть сложной для диагностики, однако с правильными подходами к конфигурации и управлению транзакциями, вы сможете избежать этой проблемы. Обеспечение уникальности transactionalId, соблюдение лучших практик и реализация адекватной обработки ошибок помогут вам успешно интегрировать Kafka и PostgreSQL.

Вопросы для дальнейшего изучения

  1. Как контролировать и отслеживать показатели производительности Kafka в масштабируемых системах?
  2. Как лучше всего организовать взаимодействие между микро-сервисами, использующими Kafka в качестве системы обмена сообщениями?
  3. Какие инструменты существуют для мониторинга и диагностики проблем с Kafka в реальном времени?
Оцените материал
Добавить комментарий

Капча загружается...