Вопрос или проблема
Мне хотелось бы понять, как реализован такой функционал, как ReactiveTransactional в Quarkus. Поскольку SQL базы данных (например, MySQL, PostgreSQL, SQL Server) имеют блокирующие транзакции, каким образом реализован неблокирующий/реактивный транзакционный код.
Представьте этот транзакционный код, симулирующий покупку в корзине
начать транзакцию
уменьшить запас продукта A на 5 (#1)
уменьшить запас продукта B на 2 (#2)
уменьшить баланс кошелька на $100 (#3)
вставить в журнал информацию о том, что пользователь купил 5 продуктов A (#4)
вставить в журнал информацию о том, что пользователь купил 2 продукта B (#5)
если (баланс кошелька меньше $10) вставить в таблицу needTopUp (#6)
если (баланс кошелька меньше $50) вставить в таблицу lowBalance (#7)
зафиксировать транзакцию
В обычном императивном (блокирующем) стиле вы ждете завершения каждого из этих шагов, любые проблемы приводят к откату. В мире Java это происходит в одном потоке.
Потенциальные проблемы
- Запас продукта A составляет всего 2, и вы не можете быть с отрицательным запасом
- Баланс кошелька составляет всего $5 и не может быть отрицательным
- Сервер падает на шаге #4 или #5 или #6 или #7 (нет отката, но не будет зафиксировано)
Как работает реактивный транзакционный стиль (например, ReactiveTransactional в Quarkus) в этом случае.
-
Потоки все еще должны уважать порядок, не так ли? Т.е. нельзя вставить запись в таблицу lowBalance до уменьшения баланса кошелька. Но шаги #1 и #2 могут выполняться параллельно. Как это указать в коде?
-
Является ли каждая задача отдельной транзакцией? Будет ли поток, выполняющий “уменьшение запаса”, состоять из “начать транзакцию, уменьшить запас, зафиксировать”? Я бы так не думал, ведь тогда как это можно откатить?
Или Quarkus просто подразумевает, что потоки в мире Java являются реактивными/неблокирующими, однако в базе данных все еще происходит блокировка (1 поток базы данных), если это так, тогда это имеет смысл. Но каковы преимущества этого, если на стороне базы данных все еще происходит блокировка.
Ответ или решение
Реактивная архитектура и реализация транзакций в реактивных фреймворках, таких как Quarkus с использованием аннотации @ReactiveTransactional
, представляют интересные особенности, особенно в контексте работы с реляционными базами данных, которые, как правило, являются блокирующими. Давайте разберем, как это работает на практике.
Проблема и транзакционное поведение
В классическом синхронном исполнении кода, как вы упомянули, каждая операция внутри транзакции выполняется последовательно. Это означает, что мы дожидаемся завершения каждой команды перед выполнением следующей, и если что-то пойдет не так, мы можем откатить транзакцию. Однако такое поведение не слишком эффективно и может привести к блокировкам.
Реактивная транзакционная обработка
Когда мы говорим о реактивной обработке, мы подразумеваем, что операции не блокируют текущую нить исполнения, а вместо этого работают на основе "обещаний" (Promises) или "реактивных потоков" (Reactive Streams), таких как Mono
и Flux
в проекте Reactor.
1. Параллелизм и порядок выполнения
Для вашего примера транзакции, где мы уменьшаем запасы продуктов и баланс кошелька, можно использовать реактивную обработку. Мы можем начать с создания асинхронных операций, которые будут выполняться параллельно, если это возможно. Для обеспечения порядка выполнения операции можно использовать оператор flatMap
, который гарантирует выполнение операций последовательно.
Вот пример, как это можно сделать в Quarkus с использованием реактивного подхода:
@ReactiveTransactional
public Uni<Void> checkout(Cart cart, User user) {
return Uni.combine()
.all()
.unis(
decreaseStock("productA", 5)
.onItem().transformToUni(stockDecreased -> {
if (stockDecreased) {
return logPurchase(user, "productA", 5);
} else {
return Uni.createFrom().failure(new RuntimeException("Недостаточно товара A на складе"));
}
}),
decreaseStock("productB", 2)
.onItem().transformToUni(stockDecreased -> {
if (stockDecreased) {
return logPurchase(user, "productB", 2);
} else {
return Uni.createFrom().failure(new RuntimeException("Недостаточно товара B на складе"));
}
}),
decreaseWalletBalance(user.getId(), 100)
.onItem().transformToUni(walletUpdated -> {
if (walletUpdated) {
return checkWalletBalance(user.getId());
} else {
return Uni.createFrom().failure(new RuntimeException("Недостаточно средств"));
}
})
)
.flatMap(allResults -> {
// Лог указывающий на все покупки, включая условия для нужды в пополнении
return Uni.combine()
.all()
.unis(
insertNeedTopUpIfNeeded(user.getId()),
insertLowBalanceIfNeeded(user.getId())
).discarding();
});
}
В этом коде мы параллельно уменьшаем запасы и баланс, а затем обрабатываем результаты:
- У каждого из этих вызовов есть свой обработчик ошибок, что позволяет нам убедиться, что, если произошла какая-то ошибка, она будет правильно обработана.
flatMap
гарантирует, что на последнем этапе проверки баланса мы не выполним его, пока не будет завершено уменьшение запасов и баланса.
2. Отмена и откат
По сути, в предложенной реализации мы управляем откатами через обработку ошибок. Если одна из операций завершается неудачей, мы генерируем исключение, и код выше этого вызова может быть сконструирован так, чтобы обрабатывать это исключение и выполнять откат.
Что касается транзакционной модели баз данных, многие реактивные драйверы (например, для PostgreSQL) предлагают поддержку транзакционных операций в реактивном стиле. Таким образом, можно контролировать транзакции через асинхронный API, анициализируя begin transaction
и commit transaction
в рамках реактивной цепочки.
Вывод
Реактивные транзакции требуют другой подход к проектированию по сравнению с традиционными, но они предоставляют мощные инструменты для обработки запросов с высокой производительностью и меньшими блокировками. Реактивные архитектуры позволяют использовать возможности асинхронного программирования, тем самым улучшая отзывчивость приложений. В конечном счете, даже если база данных остается блокирующей в своем исполнении, использование реактивного подхода при обработке операций позволяет значительно повысить масштабируемость и эффективность приложений.