Вопрос или проблема
В настоящее время я работаю над приложением Kafka Streams, где пытаюсь настроить GlobalKTable и KStream, используя одну и ту же тему (sampleTopic). Однако при попытке запустить приложение я сталкиваюсь со следующей ошибкой:
Не удалось создать [org.apache.kafka.streams.KafkaStreams]: Метод-фабрика ‘kafkaStreams’ вызвал исключение с сообщением: Неверная топология: Тема sampleTopic уже зарегистрирована другим источником.
Как я могу решить эту проблему, продолжая использовать и GlobalKTable, и KStream для одной и той же темы?
@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
// GlobalKTable
GlobalKTable<String, GenericRecord> globalKTable = streamsBuilder.globalTable(sampleTopic,
Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde)
);
// KStream
KStream<String, GenericRecord> kStream = streamsBuilder.stream(sampleTopic,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
KStream<String, GenericRecord> joinedStream = kStream.leftJoin(
globalKTable,
(key, valueFromStream) -> key,
(valueFromStream, valueFromTable) -> {
if (valueFromTable == null) {
return valueFromStream;
} else {
return null;
}
}
);
processJoinedStream(joinedStream);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
streams.start();
return streams;
}
private void processJoinedStream(KStream<String, GenericRecord> joinedStream) {
joinedStream.foreach((key, value) -> {
List<ConsumerRecord<String, GenericRecord>> consumerRecords = Collections.singletonList(
new ConsumerRecord<>(key, 0, 0L, key, value));
dataProvider.consumeRecords(consumerRecords);
});
}
Ответ или решение
Ошибка, с которой вы столкнулись, возникает из-за того, что вы пытаетесь зарегистрировать одну и ту же тему sampleTopic
одновременно для глобальной таблицы (GlobalKTable
) и потока (KStream
). Apache Kafka Streams не позволяет одновременно регистрировать одну и ту же тему для разных источников, что и является причиной сообщения об ошибке.
Чтобы решить эту проблему, вам необходимо изменить архитектуру вашего приложения так, чтобы не было конфликтов в регистрации темы. Вот несколько подходов, которые можно использовать для устранения этой проблемы:
Подход 1: Использование только одной конструкции (GlobalKTable или KStream)
Если это возможно, выберите одну из структур, которая лучше всего подходит для вашей логики обработки данных. Например, если вам нужно постоянно получать данные и обновления, то использование KStream
может быть предпочтительным. В противном случае, если вам необходимо сделать присоединение к статическим данным (например, значения из базы данных), используйте GlobalKTable
.
Подход 2: Разделение обработки
Если вы хотите использовать и GlobalKTable
, и KStream
, то создайте два отдельных приложения Kafka Streams для каждой структуры. Например, одно приложение может обрабатывать GlobalKTable
, а другое — KStream
. Это может потребовать более сложной архитектуры, но позволит избежать конфликтов:
- Создайте два разных класса конфигурации, каждый из которых будет настраивать и запускать свое собственное приложение Kafka Streams.
- Убедитесь, что оба приложения используют отдельные
APPLICATION_ID_CONFIG
, чтобы избежать путаницы и конфликтов.
Пример кода для разделения приложений:
// Приложение для GlobalKTable
@Bean
public KafkaStreams globalKTableKafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
// Конфигурация и код для GlobalKTable
}
// Приложение для KStream
@Bean
public KafkaStreams kStreamKafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
// Конфигурация и код для KStream
}
Подход 3: Использование разных тем
Если использование одной и той же темы абсолютно необходимо, подумайте о том, чтобы создать отдельные темы для ваших GlobalKTable
и KStream
. Например, вы можете создать sampleTopic
для KStream
и sampleTopic-global
для GlobalKTable
. Это избавит от конфликтов.
GlobalKTable<String, GenericRecord> globalKTable = streamsBuilder.globalTable("sampleTopic-global", ...);
KStream<String, GenericRecord> kStream = streamsBuilder.stream("sampleTopic", ...);
Заключение
Таким образом, для решения проблемы с ошибкой "Invalid topology: Topic sampleTopic has already been registered by another source", вам необходимо либо использовать только одну структуру данных в зависимости от ваших требований, либо разделить их на разные приложения или просто использовать разные темы. Убедитесь, что ваша архитектура и логика обработки данных соответствуют требованиям вашего приложения, чтобы избежать подобных ошибок в будущем.