Вопрос или проблема
У меня есть приложение Spring Integration, и я хочу добавить функциональность для отправки сообщений в Kafka после успешной обработки. Вот что я написал:
@Bean
public Supplier<Message<?>> myFunction(QueueChannel kafkaMessageChannel) {
return kafkaMessageChannel::receive;
}
@Bean(name = KAFKA_CHANNEL)
MessageChannel kafkaMessageChannel() {
return MessageChannels.queue().get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from(SOME_CHANNEL)
.log(LoggingHandler.Level.INFO)
.transform(// transformer)
.channel(KAFKA_CHANNEL)
.get();
}
spring:
cloud:
function:
definition: myFunction
stream:
default-binder: kafka
kafka:
binder:
brokers: ${BROKERS}
bindings:
myFunction-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
bindings:
myFunction-out-0:
destination: myDestination
binder: kafka
content-type: application/json
producer:
error-channel-enabled: true
use-native-encoding: true
Сообщение попадает в трансформер, где оно регистрируется, но никогда не появляется в теме Kafka.
Есть ли какие-то предложения, что может пойти не так и как это исправить? Спасибо.
Ответ или решение
Чтобы решить проблему с отправкой сообщений в Kafka из вашего Spring Integration приложения, необходимо учесть несколько аспектов, которые могут препятствовать успешной отправке сообщений, даже если они проходят через трансформатор.
Проверка конфигурации
-
Проверка канала отправки сообщений:
Вы указали, что используетеkafkaMessageChannel
для отправки сообщений. Убедитесь, что это удобно связано с контактом, который отвечает за отправку сообщений в Kafka. Вам нужно удостовериться, что сообщения проходят черезMessagingGateway
или правильно определенныйMessageChannel
для отправки в Kafka.Проверьте, что ваш поток интеграции выглядит следующим образом:
@Bean public IntegrationFlow flow1() { return IntegrationFlows.from(SOME_CHANNEL) .log(LoggingHandler.Level.INFO) .transform(// ваш трансформатор) .channel(KAFKA_CHANNEL) .handle(Kafka.outboundChannelAdapter(kafkaTemplate())) // Добавлено обработчика для отправки .get(); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${BROKERS}"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
-
Проверка конфигурации Spring Cloud Stream:
Убедитесь, что ваша конфигурация дляspring.cloud.function.definition
приведена в правильный вид, и что сам канал будет правильно соотнесен с Kafka. Возможно, вам стоит изменить способ определения потоков.Проверьте также правильность настройки свойств в
application.yml
. Важно удостовериться, что вы указали существующий и доступный топик вdestination
:spring: cloud: stream: bindings: myFunction-out-0: destination: myDestination binder: kafka content-type: application/json
-
Обработка ошибок:
Возможные ошибки при отправке сообщений могут быть неявны. Вы можете добавитьerror-channel
для обработки ошибок отправки..channel(KAFKA_CHANNEL) .handle(Kafka.outboundChannelAdapter(kafkaTemplate()) .errorChannel("kafkaErrorChannel")) // Добавлено
Затем добавьте обработчик ошибок:
@Bean public IntegrationFlow kafkaErrorFlow() { return IntegrationFlows.from("kafkaErrorChannel") .log(LoggingHandler.Level.ERROR) .get(); }
Логи и отладка
Загляните в логи вашей системы, чтобы выяснить, не возникает ли каких-либо исключений при отправке в Kafka. Часто информация о проблемах с подключением или сериализацией может быть полезной.
Итог
После всех этих проверок и изменений разобрать возможность отправки сообщений из Spring Integration приложения в Kafka не составит труда. Не забывайте тщательно проверять журналы и конфигурации, чтобы выявлять возможные проблемы. Если у вас остались вопросы или появляются новые проблемы, не стесняйтесь задавать их.