Spring Reactor Flux, как подписаться и затем заблокироваться до завершения всех операций.

Вопрос или проблема

Spring reactor предоставляет blocklast(), если я хочу быть синхронным и заблокироваться до завершения всех элементов

Но что, если я хочу немного продолжить, а затем заблокироваться до завершения всех элементов?

( Я не хочу выполнять активное ожидание, используя isDisposed )

Мне нужно сделать это самостоятельно с помощью моего собственного сигнала, который будет инициирован onComplete, или есть лучший встроенный API?

//reactor предоставляет blocklast, если я хочу быть синхронным и заблокироваться до завершения всех элементов
//Integer data = Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName())).blockLast();
 
//но что, если я хочу немного продолжить, а затем заблокироваться
//должен ли я сделать это сам вот так? есть ли лучший способ?
Object signal = new Object(); 
Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName())).doOnComplete(()->{synchronized(signal) { signal.notify();}}).subscribe();
// сделайте другую работу здесь, а затем ждите завершения
synchronized (signal) {
    signal.wait();
}
System.out.println("Все готово");

К сожалению, нет встроенного API для этой цели, но есть более удобные способы сделать это. Ваш Object.wait будет висеть бесконечно, если Flux завершится до того, как вы закончите свою другую работу. Вы можете решить эту проблему несколькими способами:

Контроль завершения с помощью второго Publisher

Здесь я создаю второй Publisher, используя Sinks.Empty, единственная цель которого – излучать сигнал onComplete, когда оригинальный Flux завершится. Мы помещаем триггер Sinks.Empty.emitEmpty() в хук Flux.doOnComplete(), а затем позднее блокируем с помощью .asMono().block(). Если вы подписались позже, Mono завершится сразу:

Sinks.Empty<Void> completionSink = Sinks.empty();

Flux.range(1,10)
        .delayElements(Duration.ofSeconds(1))
        .doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName()))
        .doOnComplete(() -> completionSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
        .subscribe();

// сделайте другую работу здесь, а затем ждите завершения
completionSink.asMono().block();
System.out.println("Все готово");

Поместите свою другую работу в другой Publisher

Кроме того, вы можете разместить свою другую работу во втором Publisher с помощью Mono.fromRunnable, а затем использовать Flux.merge, чтобы подписаться на оба Publisher одновременно. Flux.merge будет излучать все элементы из обоих Publisher и завершится только если оба будут завершены. Поскольку ваш Mono.fromRunnable не излучает никаких элементов, результирующий Flux будет содержать только элементы оригинального Flux.

Flux<Integer> flux = Flux.range(1, 10)
        .delayElements(Duration.ofSeconds(1))
        .doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName()));
Mono<Void> otherWork = Mono.fromRunnable(() -> {
    // сделайте другую работу здесь
});
Flux.merge(flux, otherWork).blockLast();
System.out.println("Все готово");

Обратите внимание, что Flux.merge помещает оба Publisher на один и тот же Scheduler, что может вызвать проблемы, если otherWork не асинхронен или если оригинальный Flux никогда не завершится. Вы можете решить эту проблему, присвоив разным Scheduler’ам каждый Publisher.

Я бы упростил ответ @Patrick Hooijer и использовал CountDownLatch, это идеальный элемент конкурентной библиотеки, который ждет, пока его значение не изменится на ноль.

Вы просто вызываете .await() и он просто ждет, пока Flux вызовет countDown в методе doOnComplete.

    CountDownLatch countDownLatch = new CountDownLatch(1);

    Flux.range(1,10)
            .delayElements(Duration.ofSeconds(1))
            .doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName()))
            .doOnComplete(countDownLatch::countDown)
            .subscribe();

    countDownLatch.await();

    System.out.println("Все готово");

К сожалению, нет встроенного API для этой цели

Это важное поведение для меня, поэтому нам не придется ждать никаких результатов, если главный поток собирается закрыться, так как Flux может занять гораздо больше времени, чем просто вывод чисел; мы, вероятно, не сможем закрыть главный поток в этом случае.

Но, конечно, это зависит от видения создателей этой библиотеки.

Как я вижу в вашем вопросе, вы хотите какой-то другой API, чтобы заменить blockLast, верно? И насколько я знаю, вам придется реализовать это самостоятельно. Встроенного API для этой цели нет.

Mono.zip(
                        Mono.fromCallable(() -> {
                            //что-то
                            System.out.println("Завершено");
                            return 1;
                        }).delaySubscription(Duration.ofSeconds(12)).subscribeOn(Schedulers.boundedElastic()),
                        Flux.range(1, 10)
                                .delayElements(Duration.ofSeconds(1))
                                .doOnNext((Integer next) -> System.out.println(next + " на потоке " + Thread.currentThread().getName()))
                                .last()
                )
                .subscribe(__ -> System.out.println("Все готово"));

Ответ или решение

Когда вы работаете с Spring Reactor и используете Flux для реактивного программирования, часто возникает необходимость выполнить некоторые действия асинхронно, а затем дождаться завершения работы других задач. В случае, если вы хотите выполнить дополнительные операции и затем заблокироваться до завершения всех элементов в Flux, вы можете использовать несколько подходов.

Подход с использованием CountDownLatch

Одним из самых простых и элегантных решений является применение CountDownLatch, который предоставляет механизм ожидания завершения фоновых операций.

import reactor.core.publisher.Flux;

import java.util.concurrent.CountDownLatch;

public class FluxCompletionExample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        Flux.range(1, 10)
            .delayElements(Duration.ofSeconds(1))
            .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()))
            .doOnComplete(countDownLatch::countDown)
            .subscribe();

        // Здесь можно выполнять другие задачи
        // ...

        countDownLatch.await();

        System.out.println("All done");
    }
}

Этот код создает CountDownLatch и ждет, пока оператор doOnComplete вызовет метод countDown, который уменьшит значение счетчика до нуля, позволяя продолжить выполнение программы.

Использование Sinks для мониторинга завершения

Другой подход заключается в использовании Sinks.Empty, который позволяет создать отдельный источник завершения. Вы можете сгенерировать сигнал завершения в doOnComplete, а затем блокировать поток с помощью block().

import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class FluxCompletionUsingSinksExample {
    public static void main(String[] args) {
        Sinks.Empty<Void> completionSink = Sinks.empty();

        Flux.range(1, 10)
            .delayElements(Duration.ofSeconds(1))
            .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()))
            .doOnComplete(() -> completionSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
            .subscribe();

        // Здесь можете выполнять другие задачи
        // ...

        // Ожидание завершения
        completionSink.asMono().block();
        System.out.println("All done");
    }
}

Альтернативный способ: Mono.fromRunnable

Кроме того, вы можете использовать Mono.fromRunnable для выполнения других задач, а затем объединить два Publisher с помощью Flux.merge. Этот метод позволяет выполнять задачи параллельно и завершает выполнение только тогда, когда оба Publisher завершены.

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxMergeExample {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.range(1, 10)
            .delayElements(Duration.ofSeconds(1))
            .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()));

        Mono<Void> otherWork = Mono.fromRunnable(() -> {
            // Здесь можно выполнять другую работу
        });

        Flux.merge(flux, otherWork).blockLast();
        System.out.println("All done");
    }
}

Заключение

Как видно, в Spring Reactor действительно нет встроенного механизма, который бы позволял блокировать поток до завершения элементов после некоторого асинхронного выполнения. Однако с помощью таких инструментов, как CountDownLatch, Sinks или Mono, можно достичь данной функциональности. Выбор метода будет зависеть от ваших требований и сложности задач, которые вам нужно решить.

Оцените материал
Добавить комментарий

Капча загружается...