- Вопрос или проблема
- Ответ или решение
- Ошибка ProducerFencedException при использовании транзакций с Spring Kafka и синхронизацией базы данных
- Введение
- Проблема
- Возможные причины проблемы
- Рекомендации по решению
- 1. Уникальный transactionalId для каждого экземпляра
- 2. Управление транзакциями
- 3. Управление ProducerFencedException
- Заключение
- Вопросы для дальнейшего изучения
Вопрос или проблема
Я сталкиваюсь с исключением ProducerFencedException при попытке синхронизировать события между Kafka и базой данных PostgreSQL, используя Spring Kafka. События отправляются в Kafka, и база данных обновляется успешно, и я могу видеть события в теме Kafka, когда использую уровень изоляции read_uncommitted. Однако потребитель, пох parece, не потребляет сообщения с уровнем изоляции read_committed.
Конфигурация продюсера:
@Configuration
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String boostrapServers;
public Map<String, Object> producerConfig() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
return producerConfig;
}
@Bean
public <T> ProducerFactory<String, T> producerFactory() {
DefaultKafkaProducerFactory<String, T> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(producerConfig());
defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
return defaultKafkaProducerFactory;
}
@Bean
public <T> KafkaTemplate<String, T> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, Object> transactionEventProducerKafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
Конфигурация потребителя:
isolation.level=read_committed
Логика отправки события продюсером и обновления базы данных:
@Transactional
public void sendMessageToTopics(Map<Long, String> duplicateCampaignId,
List<CampaignSMS> campaignSMSList, SmsType smsType) {
if (smsType == SmsType.BULK) {
campaignSMSList.forEach(campaignSMS -> {
sendEvent(smsBulkTopicName, campaignSMS);
});
}
if (smsType == SmsType.PROFILE) {
campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
}
duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}
private void sendEvent(String topicName, CampaignSMS campaignSMS) {
try {
kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
throw new InternalException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error(e.getMessage());
throw new InternalException(e.getMessage());
}
}
Ошибка, которую я получаю в логах:
ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
Эта ошибка возникает при попытке зафиксировать транзакцию в моей запланированной задаче. Похоже, что несколько продюсеров используют один и тот же transactionalId, что приводит к этой проблеме с ограждением.Я также пытался динамически генерировать transactional.id с помощью UUID.randomUUID() для каждого продюсера, но этот подход также не сработал, и ошибка появляется та же.(ProducerConfig.TRANSACTIONAL_ID_CONFIG, “transaction-event-id”+UUID.randomUUID() и defaultKafkaProducerFactory.setTransactionIdPrefix(“transaction-event-id”+UUID.randomUUID());)
Вот мои вопросы:
- Как я могу гарантировать, что каждый экземпляр продюсера имеет уникальный
transactionalId? - Каковы лучшие практики управления транзакциями Kafka при
синхронизации с базой данных? - Существует ли способ обработать ProducerFencedException плавно в таком
сценарии?
Любая помощь будет очень признательна! Спасибо!
Ответ или решение
Ошибка ProducerFencedException при использовании транзакций с Spring Kafka и синхронизацией базы данных
Введение
При работе с Apache Kafka и Spring Kafka, особенно в контексте транзакций и синхронизации с базой данных PostgreSQL, возможно столкновение с ошибкой ProducerFencedException
. Эта ошибка возникает, когда несколько продюсеров используют один и тот же идентификатор транзакции (transactional.id
), что приводит к "ограждению" одного продюсера другим. В данном ответе мы разберем возможные причины возникновения этой ошибки, а также предложим решения и лучшие практики для работы с транзакциями Kafka.
Проблема
Вы указали, что события успешно отправляются в Kafka и обновления в базе данных также выполняются корректно. Однако, когда вы используете уровень изоляции read_committed
, ваши консьюмеры не видят сообщений. Ошибка ProducerFencedException
, с которой вы сталкиваетесь, указывает на то, что существует конфликт транзакционных идентификаторов между несколькими продюсерами.
Возможные причины проблемы
-
Несоответствие идентификаторов транзакций: Убедитесь, что все экземпляры продюсера используют уникальный идентификатор транзакций. Ваша попытка динамически создавать
transactionalId
с помощьюUUID.randomUUID()
может не сработать, если используется общийProducerFactory
. -
Параллельные транзакции: Если вы создаете несколько экземпляров продюсера, у которых одинаковый
transactionalId
, это также вызывает конфликт. -
Производительность и временные задержки: Если операции отправки сообщений слишком быстрые, и транзакции не успевают завершаться, это может привести к тому, что новый продюсер "фендует" предыдущий.
Рекомендации по решению
1. Уникальный transactionalId
для каждого экземпляра
Для обеспечения уникальности идентификатора транзакции можно использовать условие, при котором каждый экземпляр настроен с уникальным идентификатором. Например, вы можете использовать UUID
или комбинировать с уникальным идентификатором сервиса:
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id-" + UUID.randomUUID().toString());
2. Управление транзакциями
Лучшие практики для управления транзакциями в Kafka:
-
Разделите логику отправки сообщений и обновления базы данных. Это позволит вам более точно контролировать состояние транзакций и минимизировать количество конфликтов.
-
Используйте аннотацию
@Transactional
только на высокоуровневых методах, чтобы избежать нежелательного поведения. -
Настройте подходящую стратегию повторного выполнения, если происходит сбой транзакции, чтобы избежать неполных или испорченных состояний.
3. Управление ProducerFencedException
Для обработки ProducerFencedException
:
-
Реализуйте механизм повторной попытки: При возникновении исключения, вы можете повторить операцию, используя новый
producer
, настроенный с уникальнымtransactionalId
. -
Логируйте ошибки и временные метки, чтобы вы могли диагностировать и понимать, как и когда происходят столкновения.
Заключение
Ошибка ProducerFencedException
в контексте использования Kafka транзакций и синхронизации с базой данных может быть сложной для диагностики, однако с правильными подходами к конфигурации и управлению транзакциями, вы сможете избежать этой проблемы. Обеспечение уникальности transactionalId
, соблюдение лучших практик и реализация адекватной обработки ошибок помогут вам успешно интегрировать Kafka и PostgreSQL.
Вопросы для дальнейшего изучения
- Как контролировать и отслеживать показатели производительности Kafka в масштабируемых системах?
- Как лучше всего организовать взаимодействие между микро-сервисами, использующими Kafka в качестве системы обмена сообщениями?
- Какие инструменты существуют для мониторинга и диагностики проблем с Kafka в реальном времени?