Вопрос или проблема
Я пытаюсь отправить сообщения в две разные темы Kafka, используя KafkaSender из Reactor Kafka.
Под двумя разными темами Kafka я имею в виду:
Есть тема под названием “first_topic” в кластере kafka-first-broker.com:9092.
Затем есть другая тема под названием “another_topic” в другом кластере, не таком, как выше, под названием kafka-another-broker-not-the-same-as-above.com:9093.
Однако, похоже, что метод send
Reactor Kafka поддерживает только одно направление.
Я пытался использовать метод send с запятой, разделяющей список (двух тем).
SenderRecord.create(new ProducerRecord<>("first_topic,another_topic", null, mymessage), mymessage)
Я ожидал, что это будет отправлено в две разные темы, но оно может быть отправлено только в одну.
Как я могу использовать API KafkaSender из Reactor Kafka, чтобы отправить сообщения в две разные темы, находящиеся в двух разных кластерах Kafka?
Ответ или решение
Чтобы отправить сообщения в два различных топика, расположенных на двух разных кластерах Kafka, используя API KafkaSender от Reactor Kafka, вам потребуется создать отдельные экземпляры KafkaSender для каждого кластера. Это связано с тем, что send-метод KafkaSender принимает только один объект ProducerRecord, что означает, что вам нужно будет управлять отправкой для каждого кластера Kafka отдельно.
Шаги для отправки сообщений в два различных топика
1. Настройка зависимостей
Убедитесь, что у вас установлены необходимые зависимости для использования Reactor Kafka в вашем проекте. Ваша pom.xml
(если вы используете Maven) должна содержать следующее:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.x.x</version> <!-- Укажите последнюю версию -->
</dependency>
2. Конфигурация KafkaSender для каждого кластера
Вам нужно будет настроить KafkaSender для каждого кластера. Пример кода ниже демонстрирует, как можно сделать это:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducer {
public static void main(String[] args) {
// Конфигурация для первого кластера
Map<String, Object> configFirstCluster = new HashMap<>();
configFirstCluster.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-first-broker.com:9092");
configFirstCluster.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configFirstCluster.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptionsFirst = SenderOptions.create(configFirstCluster);
KafkaSender<String, String> kafkaSenderFirst = KafkaSender.create(senderOptionsFirst);
// Конфигурация для второго кластера
Map<String, Object> configSecondCluster = new HashMap<>();
configSecondCluster.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-another-broker-not-the-same-as-above.com:9093");
configSecondCluster.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configSecondCluster.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptionsSecond = SenderOptions.create(configSecondCluster);
KafkaSender<String, String> kafkaSenderSecond = KafkaSender.create(senderOptionsSecond);
// Пример отправки сообщения
sendMessages(kafkaSenderFirst, "first_topic", "Hello Kafka from the first cluster!");
sendMessages(kafkaSenderSecond, "another_topic", "Hello Kafka from the second cluster!");
}
private static void sendMessages(KafkaSender<String, String> kafkaSender, String topic, String message) {
kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, null, message), message)))
.doOnError(Throwable::printStackTrace)
.subscribe();
}
}
3. Описание кода
-
Создание конфигураций: Для каждого кластера создаются отдельные конфигурационные карты, в которых указываются необходимые параметры, такие как адреса маршрутизаторов и классы сериализации.
-
Создание экземпляра KafkaSender: Каждый KafkaSender создается на основе своей конфигурации.
-
Отправка сообщений: Мы определяем метод sendMessages, который принимает экземпляр KafkaSender, название топика и сообщение для отправки. Мы используем метод send, чтобы инициировать отправку сообщения, оборачивая его в Mono.
4. Завершение работ
Не забудьте закрыть KafkaSender, когда он больше не нужен, чтобы освободить ресурсы:
kafkaSenderFirst.close();
kafkaSenderSecond.close();
Заключение
Используя подход, описанный выше, вы сможете эффективно отправлять сообщения в разные топики на различных кластерах Kafka с использованием Reactor Kafka. Выделение логики отправки на отдельные экземпляры KafkaSender позволяет избежать сложностей, связанных с отправкой в несколько топиков одновременно. Такой подход обеспечивает стабильность и управление ресурсами, что критически важно для производительных приложений.