Доставка сообщений Kafka в приложении Spring Integration с использованием Spring Cloud Stream

Вопрос или проблема

У меня есть приложение Spring Integration, и я хочу добавить функциональность для отправки сообщений в Kafka после успешной обработки. Вот что я написал:

@Bean
  public Supplier<Message<?>> myFunction(QueueChannel kafkaMessageChannel) {
    return kafkaMessageChannel::receive;
  }

  @Bean(name = KAFKA_CHANNEL)
  MessageChannel kafkaMessageChannel() {
    return MessageChannels.queue().get();
  }

  @Bean
  public IntegrationFlow flow1() {
    return IntegrationFlows.from(SOME_CHANNEL)
        .log(LoggingHandler.Level.INFO)
        .transform(// transformer)
        .channel(KAFKA_CHANNEL)
        .get();
  }

  spring:
  cloud:
    function:
      definition: myFunction
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers: ${BROKERS}
        bindings:
          myFunction-out-0:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
      bindings:
        myFunction-out-0:
          destination: myDestination
          binder: kafka
          content-type: application/json
          producer:
            error-channel-enabled: true
            use-native-encoding: true

Сообщение попадает в трансформер, где оно регистрируется, но никогда не появляется в теме Kafka.

Есть ли какие-то предложения, что может пойти не так и как это исправить? Спасибо.

Ответ или решение

Чтобы решить проблему с отправкой сообщений в Kafka из вашего Spring Integration приложения, необходимо учесть несколько аспектов, которые могут препятствовать успешной отправке сообщений, даже если они проходят через трансформатор.

Проверка конфигурации

  1. Проверка канала отправки сообщений:
    Вы указали, что используете kafkaMessageChannel для отправки сообщений. Убедитесь, что это удобно связано с контактом, который отвечает за отправку сообщений в Kafka. Вам нужно удостовериться, что сообщения проходят через MessagingGateway или правильно определенный MessageChannel для отправки в Kafka.

    Проверьте, что ваш поток интеграции выглядит следующим образом:

    @Bean
    public IntegrationFlow flow1() {
       return IntegrationFlows.from(SOME_CHANNEL)
           .log(LoggingHandler.Level.INFO)
           .transform(// ваш трансформатор)
           .channel(KAFKA_CHANNEL)
           .handle(Kafka.outboundChannelAdapter(kafkaTemplate())) // Добавлено обработчика для отправки
           .get();
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
       Map<String, Object> configProps = new HashMap<>();
       configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${BROKERS}");
       configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return new DefaultKafkaProducerFactory<>(configProps);
    }
  2. Проверка конфигурации Spring Cloud Stream:
    Убедитесь, что ваша конфигурация для spring.cloud.function.definition приведена в правильный вид, и что сам канал будет правильно соотнесен с Kafka. Возможно, вам стоит изменить способ определения потоков.

    Проверьте также правильность настройки свойств в application.yml. Важно удостовериться, что вы указали существующий и доступный топик в destination:

    spring:
     cloud:
       stream:
         bindings:
           myFunction-out-0:
             destination: myDestination
             binder: kafka
             content-type: application/json
  3. Обработка ошибок:
    Возможные ошибки при отправке сообщений могут быть неявны. Вы можете добавить error-channel для обработки ошибок отправки.

    .channel(KAFKA_CHANNEL)
    .handle(Kafka.outboundChannelAdapter(kafkaTemplate())
       .errorChannel("kafkaErrorChannel")) // Добавлено

    Затем добавьте обработчик ошибок:

    @Bean
    public IntegrationFlow kafkaErrorFlow() {
       return IntegrationFlows.from("kafkaErrorChannel")
           .log(LoggingHandler.Level.ERROR)
           .get();
    }

Логи и отладка

Загляните в логи вашей системы, чтобы выяснить, не возникает ли каких-либо исключений при отправке в Kafka. Часто информация о проблемах с подключением или сериализацией может быть полезной.

Итог

После всех этих проверок и изменений разобрать возможность отправки сообщений из Spring Integration приложения в Kafka не составит труда. Не забывайте тщательно проверять журналы и конфигурации, чтобы выявлять возможные проблемы. Если у вас остались вопросы или появляются новые проблемы, не стесняйтесь задавать их.

Оцените материал
Добавить комментарий

Капча загружается...