Немедленно закройте поток, если буфер обратного давления переполнен.

Вопрос или проблема

У меня есть мультикастовый приёмник с стратегией directBestEffort, в который я помещаю различные события.

Я хотел бы создать поток, который буферизует N количество событий, и в случае переполнения сразу закрывается с ошибкой, например, если приёмник медленно обрабатывает события.

Проблема в том, что onBackpressureBuffer(4, { println(“Переполнение на $it”) }, BufferOverflowStrategy.ERROR) сначала обрабатывает буфер, а затем выбрасывает ошибку. Я хотел бы, чтобы ошибка выбрасывалась в момент переполнения с выводом $it.

Вот небольшая демонстрация с логами:

private val sink: Sinks.Many<Int> = Sinks
    .many()
    .multicast()
    .directBestEffort()

fun createBufferFlux(): Flux<Int> {
    return sink.asFlux()
        .onBackpressureBuffer(4, { println("Переполнение на $it") }, BufferOverflowStrategy.ERROR)
}

fun emitTestEvents() {
    (1..20).forEach {
        sink.tryEmitNext(it)
        Thread.sleep(50)
    }
}

fun main() {
    val completable = CompletableFuture<Unit>()

    createBufferFlux()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
            { println("Медленный поток получил: $it") },
            { println("Ошибка медленного потока: $it");completable.complete(Unit) },
            { println("Медленный поток завершён");completable.complete(Unit) }
        )
    ForkJoinPool.commonPool().execute {
        emitTestEvents()
    }

    completable.join()
}
Медленный поток получил: 1
Медленный поток получил: 2
Медленный поток получил: 3
Медленный поток получил: 4
Медленный поток получил: 5
Переполнение на 11 # я хочу закрыть медленный поток здесь
Медленный поток получил: 6
Медленный поток получил: 7
Медленный поток получил: 8
Медленный поток получил: 9
# но ошибка возникает только, когда медленный поток обрабатывает буфер
Ошибка медленного потока: reactor.core.Exceptions$OverflowException: Приёмник перегружен большим количеством сигналов, чем ожидалось (ограниченная очередь...)

Также, пожалуйста, обратите внимание, что я не хотел бы использовать

Sinks.many()
     .multicast()
     .onBackpressureBuffer<Int>(4, false)

Потому что в этом случае буфер будет глобальным для приёмника.

Ответ или решение

Для решения вашей задачи с многопоточной обработкой и управлением переполнением буфера в реактивной стриме, можно воспользоваться стратегией управления потоком, чтобы обеспечить немедленное закрытие потока при переполнении буфера. На данный момент ваш код обрабатывает события до появления исключения, что получает не совсем желаемое поведение. Вместо этого мы можем использовать механизм для управления переполнением, который сразу же вызывает ошибку при превышении предела.

Решение

Поскольку вам не подходит использование глобального буфера с Sinks.many(), предлагаю использовать Flux с кастомной логикой для обработки переполнения. Мы можем создать отдельный поток, который будет обрабатывать сигнал о переполнении.

Вот полный пример вашего кода с необходимыми изменениями:

import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ForkJoinPool

private val sink: Sinks.Many<Int> = Sinks
    .many()
    .multicast()
    .directBestEffort()

fun createBufferFlux(): Flux<Int> {
    return sink.asFlux()
        .doOnNext { println("Received: $it") }
        .onBackpressureBuffer(4, { overflowHandler(it) }, BufferOverflowStrategy.ERROR)
}

fun overflowHandler(item: Int) {
    println("Overflow on $item")
    // Закрываем поток при переполнении
    throw RuntimeException("Buffer overflow occurred")
}

fun emitTestEvents() {
    (1..20).forEach {
        sink.tryEmitNext(it)
        Thread.sleep(50)
    }
}

fun main() {
    val completable = CompletableFuture<Unit>()

    createBufferFlux()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
            { println("Slow flux received: $it") },
            { println("Slow flux error: $it"); completable.complete(Unit) },
            { println("Slow flux completed"); completable.complete(Unit) }
        )

    ForkJoinPool.commonPool().execute {
        emitTestEvents()
    }

    try {
        completable.join()
    } catch (e: Exception) {
        println("Exception: ${e.message}")
    }
}

Объяснение

  1. onBackpressureBuffer: Мы используем onBackpressureBuffer с подходящей стратегией переполнения. В данном случае, при переполнении буфера выполняется overflowHandler, в котором мы будем выбрасывать исключение.

  2. Сигнал о переполнении: В overflowHandler мы выводим информацию о переполнении и вызываем исключение. Это приведет к немедленному завершению подписки на Flux, а значит, поток будет закрыт.

  3. Управление исключениями: В методе main мы обрабатываем исключение, чтобы программа не завершалась неожиданно, и мы могли видеть сообщение об ошибке.

Итог

Это решение позволяет закрывать Flux немедленно при возникновении переполнения буфера и обрабатывать ошибки явно, как вы и планировали. Подход с выбрасыванием исключения в обработчике переполнения дает вам необходимое поведение и контроль над потоками данных.

Оцените материал
Добавить комментарий

Капча загружается...