Вопрос или проблема
У меня есть требование, где мне нужно создать несколько записей с разными ключами, используя привязки Spring Cloud Kafka.
@Getter
@Setter
public class Transaction {
private String paymentRef;
private String List<AccountDetails> accounts;
}
@Getter
@Setter
public class AccountDetails {
private String accountId;
private String accountHolderName;
}
Ниже идет мой ввод
{
"paymentRef":"12345",
"accounts": [{
"accountId": "AB12345",
"accountHolderName": "Dietrich Fritzi"
}, {
"accountId": "CD98741",
"accountHolderName": "Wibke Elisabeth"
}]
}
Мой ожидаемый вывод будет:
Key :: 12345-AB12345, Value :: {
"paymentRef":"12345",
"accounts": [{
"accountId": "AB12345",
"accountHolderName": "Dietrich Fritzi"
}, {
"accountId": "CD98741",
"accountHolderName": "Wibke Elisabeth"
}]
}
Key :: 12345-CD98741, Value :: {
"paymentRef":"12345",
"accounts": [{
"accountId": "AB12345",
"accountHolderName": "Dietrich Fritzi"
}, {
"accountId": "CD98741",
"accountHolderName": "Wibke Elisabeth"
}]
}
Может кто-нибудь помочь мне написать функцию? На данный момент у меня есть
@Bean
public Function<KStream<String, Transaction>, List<KStream<String, Transaction>>> accountTransaction() {
return transaction -> transaction
.peek((key, value) -> log.info("Приходящая операция, ключ :: [{}] значение :: [{}]", key, value))
.map((key, value) -> new KeyValue<>(value.getPaymentReference() + "-" + value.getAccounts().stream().findFirst().map(AccountDetails::getAccountId).orElse("NO_VALUE"), value))
.peek((key, value) -> log.info("Исходящая операция, ключ :: {} значение :: {}", key, value));
}
Проблема с этой функцией в том, что она генерирует только одно выходное значение, я хочу сгенерировать выходные данные для всех AccountDetails в пределах транзакции.
Ответ или решение
Для решения поставленной задачи, связанной с использованием Spring Cloud Kafka для обработки потоков данных, нам необходимо модифицировать функцию, чтобы она генерировала множество записей, каждая из которых будет иметь уникальный ключ, сформированный на основе данных Transaction
и каждого элемента AccountDetails
.
Проблема
На данный момент ваш код генерирует только одну выходную запись для каждого входного Transaction
. Мы должны переработать этот функционал, чтобы он обрабатывал все объекты AccountDetails
в списке accounts
и создавал отдельные выходные записи для каждого из них.
Решение
Для достижения данной цели мы можем использовать метод flatMap
вместо map
, чтобы "развернуть" потоки и создать несколько записей на основе каждого элемента AccountDetails
.
Ниже представлена обновленная версия функции accountTransaction
, которая решает вашу задачу:
@Bean
public Function<KStream<String, Transaction>, KStream<String, Transaction>> accountTransaction() {
return transactionStream -> transactionStream
.peek((key, value) -> log.info("Incoming Transaction, Key :: [{}] Value :: [{}]", key, value))
.flatMap((key, transaction) -> {
List<KeyValue<String, Transaction>> results = new ArrayList<>();
for (AccountDetails account : transaction.getAccounts()) {
String newKey = transaction.getPaymentRef() + "-" + account.getAccountId();
results.add(new KeyValue<>(newKey, transaction));
}
return results.stream();
})
.peek((key, value) -> log.info("Outgoing Transaction, Key :: [{}] Value :: [{}]", key, value));
}
Объяснение кода
-
peek: Мы начинаем с журнала входящих транзакций, чтобы иметь возможность отслеживать входящие данные.
-
flatMap: Это ключевая функция, которая принимает каждую запись из входного потока и создает по несколько выходных записей. Мы используем список
results
для хранения всех новых записей. -
Цикл по
accounts
: Для каждогоAccountDetails
в спискеaccounts
создаем новый ключ, комбинируяpaymentRef
иaccountId
. Каждая пара ключ-значение добавляется в списокresults
. -
stream(): Метод
results.stream()
возвращает поток записей, который затем отправляется дальше в поток обработки. -
Второй peek: Мы снова ведем логирование исходящих записей для мониторинга.
Вывод
Теперь, когда вы исполнили вышеуказанную логику, ваша программа создаст выходной поток, содержащий отдельные записи для каждого AccountDetails
с соответствующими ключами.
Ожидаемый результат:
- Если входная транзакция содержит два аккаунта, вы получите два выхода с разными ключами:
Key :: 12345-AB12345, Value :: {...}
Key :: 12345-CD98741, Value :: {...}
Это краткое решение не только соответствует требованиям вашего задания, но и эффективно использует функциональные возможности Kafka Streams в рамках Spring Cloud. Удачи в реализации!