Как использовать API KafkaSender из Reactor Kafka для отправки сообщений в две разные темы (расположенные в двух разных кластерах Kafka)?

Вопрос или проблема

Я пытаюсь отправить сообщения в две разные темы Kafka, используя KafkaSender из Reactor Kafka.

Под двумя разными темами Kafka я имею в виду:
Есть тема под названием “first_topic” в кластере kafka-first-broker.com:9092.
Затем есть другая тема под названием “another_topic” в другом кластере, не таком, как выше, под названием kafka-another-broker-not-the-same-as-above.com:9093.

Однако, похоже, что метод send Reactor Kafka поддерживает только одно направление.

Я пытался использовать метод send с запятой, разделяющей список (двух тем).

SenderRecord.create(new ProducerRecord<>("first_topic,another_topic", null, mymessage), mymessage)

Я ожидал, что это будет отправлено в две разные темы, но оно может быть отправлено только в одну.

Как я могу использовать API KafkaSender из Reactor Kafka, чтобы отправить сообщения в две разные темы, находящиеся в двух разных кластерах Kafka?

Ответ или решение

Чтобы отправить сообщения в два различных топика, расположенных на двух разных кластерах Kafka, используя API KafkaSender от Reactor Kafka, вам потребуется создать отдельные экземпляры KafkaSender для каждого кластера. Это связано с тем, что send-метод KafkaSender принимает только один объект ProducerRecord, что означает, что вам нужно будет управлять отправкой для каждого кластера Kafka отдельно.

Шаги для отправки сообщений в два различных топика

1. Настройка зависимостей

Убедитесь, что у вас установлены необходимые зависимости для использования Reactor Kafka в вашем проекте. Ваша pom.xml (если вы используете Maven) должна содержать следующее:

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.x.x</version> <!-- Укажите последнюю версию -->
</dependency>

2. Конфигурация KafkaSender для каждого кластера

Вам нужно будет настроить KafkaSender для каждого кластера. Пример кода ниже демонстрирует, как можно сделать это:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class KafkaProducer {

    public static void main(String[] args) {
        // Конфигурация для первого кластера
        Map<String, Object> configFirstCluster = new HashMap<>();
        configFirstCluster.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-first-broker.com:9092");
        configFirstCluster.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configFirstCluster.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptionsFirst = SenderOptions.create(configFirstCluster);
        KafkaSender<String, String> kafkaSenderFirst = KafkaSender.create(senderOptionsFirst);

        // Конфигурация для второго кластера
        Map<String, Object> configSecondCluster = new HashMap<>();
        configSecondCluster.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-another-broker-not-the-same-as-above.com:9093");
        configSecondCluster.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configSecondCluster.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptionsSecond = SenderOptions.create(configSecondCluster);
        KafkaSender<String, String> kafkaSenderSecond = KafkaSender.create(senderOptionsSecond);

        // Пример отправки сообщения
        sendMessages(kafkaSenderFirst, "first_topic", "Hello Kafka from the first cluster!");
        sendMessages(kafkaSenderSecond, "another_topic", "Hello Kafka from the second cluster!");
    }

    private static void sendMessages(KafkaSender<String, String> kafkaSender, String topic, String message) {
        kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic, null, message), message)))
                .doOnError(Throwable::printStackTrace)
                .subscribe();
    }
}

3. Описание кода

  • Создание конфигураций: Для каждого кластера создаются отдельные конфигурационные карты, в которых указываются необходимые параметры, такие как адреса маршрутизаторов и классы сериализации.

  • Создание экземпляра KafkaSender: Каждый KafkaSender создается на основе своей конфигурации.

  • Отправка сообщений: Мы определяем метод sendMessages, который принимает экземпляр KafkaSender, название топика и сообщение для отправки. Мы используем метод send, чтобы инициировать отправку сообщения, оборачивая его в Mono.

4. Завершение работ

Не забудьте закрыть KafkaSender, когда он больше не нужен, чтобы освободить ресурсы:

kafkaSenderFirst.close();
kafkaSenderSecond.close();

Заключение

Используя подход, описанный выше, вы сможете эффективно отправлять сообщения в разные топики на различных кластерах Kafka с использованием Reactor Kafka. Выделение логики отправки на отдельные экземпляры KafkaSender позволяет избежать сложностей, связанных с отправкой в несколько топиков одновременно. Такой подход обеспечивает стабильность и управление ресурсами, что критически важно для производительных приложений.

Оцените материал
Добавить комментарий

Капча загружается...