Весеннее облако Kafka. Производить несколько записей KStream с различными ключами после обогащения входного KStream.

Вопрос или проблема

У меня есть требование, где мне нужно создать несколько записей с разными ключами, используя привязки Spring Cloud Kafka.

@Getter 
@Setter 
public class Transaction {     
  private String paymentRef;     
  private String List<AccountDetails> accounts; 
}  
@Getter
@Setter
public class AccountDetails {     
   private String accountId;     
   private String accountHolderName; 
}

Ниже идет мой ввод

{
  "paymentRef":"12345", 
  "accounts": [{
    "accountId": "AB12345",
    "accountHolderName": "Dietrich Fritzi"
   }, {
    "accountId": "CD98741",
    "accountHolderName": "Wibke Elisabeth"
   }]
}

Мой ожидаемый вывод будет:

Key :: 12345-AB12345, Value :: {
  "paymentRef":"12345", 
  "accounts": [{
    "accountId": "AB12345",
    "accountHolderName": "Dietrich Fritzi"
   }, {
    "accountId": "CD98741",
    "accountHolderName": "Wibke Elisabeth"
   }]
}

Key :: 12345-CD98741, Value :: {
  "paymentRef":"12345", 
  "accounts": [{
    "accountId": "AB12345",
    "accountHolderName": "Dietrich Fritzi"
   }, {
    "accountId": "CD98741",
    "accountHolderName": "Wibke Elisabeth"
   }]
}

Может кто-нибудь помочь мне написать функцию? На данный момент у меня есть

@Bean
public Function<KStream<String, Transaction>, List<KStream<String, Transaction>>> accountTransaction() {
return transaction -> transaction
        .peek((key, value) -> log.info("Приходящая операция, ключ :: [{}] значение :: [{}]", key, value))
        .map((key, value) -> new KeyValue<>(value.getPaymentReference() + "-" + value.getAccounts().stream().findFirst().map(AccountDetails::getAccountId).orElse("NO_VALUE"), value))
        .peek((key, value) -> log.info("Исходящая операция, ключ :: {} значение :: {}", key, value));
}

Проблема с этой функцией в том, что она генерирует только одно выходное значение, я хочу сгенерировать выходные данные для всех AccountDetails в пределах транзакции.

Ответ или решение

Для решения поставленной задачи, связанной с использованием Spring Cloud Kafka для обработки потоков данных, нам необходимо модифицировать функцию, чтобы она генерировала множество записей, каждая из которых будет иметь уникальный ключ, сформированный на основе данных Transaction и каждого элемента AccountDetails.

Проблема

На данный момент ваш код генерирует только одну выходную запись для каждого входного Transaction. Мы должны переработать этот функционал, чтобы он обрабатывал все объекты AccountDetails в списке accounts и создавал отдельные выходные записи для каждого из них.

Решение

Для достижения данной цели мы можем использовать метод flatMap вместо map, чтобы "развернуть" потоки и создать несколько записей на основе каждого элемента AccountDetails.

Ниже представлена обновленная версия функции accountTransaction, которая решает вашу задачу:

@Bean
public Function<KStream<String, Transaction>, KStream<String, Transaction>> accountTransaction() {
    return transactionStream -> transactionStream
        .peek((key, value) -> log.info("Incoming Transaction, Key :: [{}] Value :: [{}]", key, value))
        .flatMap((key, transaction) -> {
            List<KeyValue<String, Transaction>> results = new ArrayList<>();
            for (AccountDetails account : transaction.getAccounts()) {
                String newKey = transaction.getPaymentRef() + "-" + account.getAccountId();
                results.add(new KeyValue<>(newKey, transaction));
            }
            return results.stream();
        })
        .peek((key, value) -> log.info("Outgoing Transaction, Key :: [{}] Value :: [{}]", key, value));
}

Объяснение кода

  1. peek: Мы начинаем с журнала входящих транзакций, чтобы иметь возможность отслеживать входящие данные.

  2. flatMap: Это ключевая функция, которая принимает каждую запись из входного потока и создает по несколько выходных записей. Мы используем список results для хранения всех новых записей.

  3. Цикл по accounts: Для каждого AccountDetails в списке accounts создаем новый ключ, комбинируя paymentRef и accountId. Каждая пара ключ-значение добавляется в список results.

  4. stream(): Метод results.stream() возвращает поток записей, который затем отправляется дальше в поток обработки.

  5. Второй peek: Мы снова ведем логирование исходящих записей для мониторинга.

Вывод

Теперь, когда вы исполнили вышеуказанную логику, ваша программа создаст выходной поток, содержащий отдельные записи для каждого AccountDetails с соответствующими ключами.

Ожидаемый результат:

  • Если входная транзакция содержит два аккаунта, вы получите два выхода с разными ключами:
    • Key :: 12345-AB12345, Value :: {...}
    • Key :: 12345-CD98741, Value :: {...}

Это краткое решение не только соответствует требованиям вашего задания, но и эффективно использует функциональные возможности Kafka Streams в рамках Spring Cloud. Удачи в реализации!

Оцените материал
Добавить комментарий

Капча загружается...