Ошибка Kafka Streams: Неверная топология: Топик уже зарегистрирован другим источником

Вопрос или проблема

В настоящее время я работаю над приложением Kafka Streams, где пытаюсь настроить GlobalKTable и KStream, используя одну и ту же тему (sampleTopic). Однако при попытке запустить приложение я сталкиваюсь со следующей ошибкой:

Не удалось создать [org.apache.kafka.streams.KafkaStreams]: Метод-фабрика ‘kafkaStreams’ вызвал исключение с сообщением: Неверная топология: Тема sampleTopic уже зарегистрирована другим источником.

Как я могу решить эту проблему, продолжая использовать и GlobalKTable, и KStream для одной и той же темы?

@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());

    Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    valueGenericAvroSerde.configure(serdeConfig, false);

    // GlobalKTable
    GlobalKTable<String, GenericRecord> globalKTable = streamsBuilder.globalTable(sampleTopic,
        Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as(storeName + "-global")
            .withKeySerde(Serdes.String())
            .withValueSerde(valueGenericAvroSerde)
    );

    // KStream
    KStream<String, GenericRecord> kStream = streamsBuilder.stream(sampleTopic,
        Consumed.with(Serdes.String(), valueGenericAvroSerde));

    KStream<String, GenericRecord> joinedStream = kStream.leftJoin(
        globalKTable,
        (key, valueFromStream) -> key,
        (valueFromStream, valueFromTable) -> {
            if (valueFromTable == null) {
                return valueFromStream;
            } else {
                return null;
            }
        }
    );

    processJoinedStream(joinedStream);

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
    streams.start();

    return streams;
}

private void processJoinedStream(KStream<String, GenericRecord> joinedStream) {
    joinedStream.foreach((key, value) -> {
        List<ConsumerRecord<String, GenericRecord>> consumerRecords = Collections.singletonList(
            new ConsumerRecord<>(key, 0, 0L, key, value));
        dataProvider.consumeRecords(consumerRecords);
    });
}

Ответ или решение

Ошибка, с которой вы столкнулись, возникает из-за того, что вы пытаетесь зарегистрировать одну и ту же тему sampleTopic одновременно для глобальной таблицы (GlobalKTable) и потока (KStream). Apache Kafka Streams не позволяет одновременно регистрировать одну и ту же тему для разных источников, что и является причиной сообщения об ошибке.

Чтобы решить эту проблему, вам необходимо изменить архитектуру вашего приложения так, чтобы не было конфликтов в регистрации темы. Вот несколько подходов, которые можно использовать для устранения этой проблемы:

Подход 1: Использование только одной конструкции (GlobalKTable или KStream)

Если это возможно, выберите одну из структур, которая лучше всего подходит для вашей логики обработки данных. Например, если вам нужно постоянно получать данные и обновления, то использование KStream может быть предпочтительным. В противном случае, если вам необходимо сделать присоединение к статическим данным (например, значения из базы данных), используйте GlobalKTable.

Подход 2: Разделение обработки

Если вы хотите использовать и GlobalKTable, и KStream, то создайте два отдельных приложения Kafka Streams для каждой структуры. Например, одно приложение может обрабатывать GlobalKTable, а другое — KStream. Это может потребовать более сложной архитектуры, но позволит избежать конфликтов:

  1. Создайте два разных класса конфигурации, каждый из которых будет настраивать и запускать свое собственное приложение Kafka Streams.
  2. Убедитесь, что оба приложения используют отдельные APPLICATION_ID_CONFIG, чтобы избежать путаницы и конфликтов.

Пример кода для разделения приложений:

// Приложение для GlobalKTable
@Bean
public KafkaStreams globalKTableKafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
    // Конфигурация и код для GlobalKTable
}

// Приложение для KStream
@Bean
public KafkaStreams kStreamKafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
    // Конфигурация и код для KStream
}

Подход 3: Использование разных тем

Если использование одной и той же темы абсолютно необходимо, подумайте о том, чтобы создать отдельные темы для ваших GlobalKTable и KStream. Например, вы можете создать sampleTopic для KStream и sampleTopic-global для GlobalKTable. Это избавит от конфликтов.

GlobalKTable<String, GenericRecord> globalKTable = streamsBuilder.globalTable("sampleTopic-global", ...);

KStream<String, GenericRecord> kStream = streamsBuilder.stream("sampleTopic", ...);

Заключение

Таким образом, для решения проблемы с ошибкой "Invalid topology: Topic sampleTopic has already been registered by another source", вам необходимо либо использовать только одну структуру данных в зависимости от ваших требований, либо разделить их на разные приложения или просто использовать разные темы. Убедитесь, что ваша архитектура и логика обработки данных соответствуют требованиям вашего приложения, чтобы избежать подобных ошибок в будущем.

Оцените материал
Добавить комментарий

Капча загружается...