Подписчик ZMQ не получает все сообщения.

Вопрос или проблема

Я пытаюсь осуществить связь PUB SUB ZMQ с использованием Golang. Я пытался отправить несколько сообщений (например, 10000) от PUB к SUB и проверял время, необходимое для получения всех сообщений в SUB. Но я никогда не получаю все сообщения в SUB.

В чем может быть причина и как это исправить?

Код PUB –

package main

import (
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    zmq "github.com/pebbe/zmq4"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func SampleStrOfSize(size int) string {
    b := make([]byte, size)
    idx := 0
    for i := 0; i < size; i++ {
        b[i] = letters[idx]
        idx = (idx + 1) % len(letters)
    }
    // fmt.Println("message: ", string(b))
    return string(b)
}

func main() {
    publisher, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        os.Exit()
    }
    defer publisher.Close()
    connectionStr := "tcp://127.0.0.1:5555"

    err = publisher.Bind(connectionStr)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Привязано к", connectionStr)

        // Создание случайной строки сообщения
    message := SampleStrOfSize(1024)
    msgCount := 0

    if len(os.Args) > 1 {
        if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
            return
        }
    } else {
        fmt.Println("Количество сообщений не указано")
        return
    }

    if msgCount <= 0 {
        fmt.Printf("Указано недопустимое количество сообщений (%v)", msgCount)
        return
    }

        // Даем немного времени для подключения SUB
    time.Sleep(time.Second * 5)
    fmt.Println("Начинаем отправку сообщений")
    start := time.Now()

    for i := 0; i < msgCount; i++ {
                // Каждый раз отправляем одно и то же сообщение
        _, err = publisher.Send(message, 0)
        if err != nil {
            fmt.Printf("Произошла ошибка при отправке сообщения. %v", err)
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("Отправка %d сообщений заняла %s\n", msgCount, elapsed)
        // Не закрывать сразу
        time.Sleep(time.Second * 30)
}

Код SUB –

package main

import (
    "fmt"
    "os"
    "strconv"
    "time"

    zmq "github.com/pebbe/zmq4"
)

func main() {
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        fmt.Println("Не удалось открыть сокет")
        os.Exit(1)
    }
    defer subscriber.Close()

    err = subscriber.Connect("tcp://127.0.0.1:5555")
    if err != nil {
        fmt.Println("Не удалось подключиться")
        os.Exit(1)
    }

    msgCount := 0
    count := 0
    if len(os.Args) > 1 {
        if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
            return
        }
    } else {
        fmt.Println("Количество сообщений не указано")
        os.Exit(1)
    }
    fmt.Printf("Ожидание %d сообщений\n", msgCount)

    // Подписка на все сообщения
    err = subscriber.SetSubscribe("")
    if err != nil {
        fmt.Println("Не удалось подписаться на все сообщения")
        os.Exit(1)
    }

    var start time.Time
    for {
        _, err := subscriber.Recv(0)

        if count == 0 {
            start = time.Now()
        }
        if err != nil {
            fmt.Println("Ошибка получения")
        }

        count++

        if count == msgCount {
            break
        } else if 0 == count%1000 {
            // Печать времени каждые 1000 сообщений
            elapsed := time.Since(start)
            fmt.Printf("Получено %d сообщений за %s\n", count, elapsed)
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("Получено %d сообщений за %s\n", msgCount, elapsed)
}

Когда я запускаю оба кода с, скажем, msgCount равным 10000, я не получаю 10000 сообщений в SUB. Например, один раз я получил следующий вывод в SUB –

Ожидание 10000 сообщений
Получено 1000 сообщений за 17.435321ms
Получено 2000 сообщений за 25.530057ms
Получено 3000 сообщений за 27.80558ms
Получено 4000 сообщений за 1m40.583143061s
Получено 5000 сообщений за 1m40.590513201s
Получено 6000 сообщений за 1m40.597145666s

В чем может быть необычная задержка после 3000 сообщений?

ZMQ имеет верхний предел

Верхний предел – это жесткое ограничение на максимальное количество
незавершенных сообщений, которые ZeroMQ хранит в памяти для любого одного пира,
с которым указанный сокет взаимодействует.

Если это ограничение достигнуто, сокет попадает в исключительное состояние,
и в зависимости от типа сокета ZeroMQ принимает соответствующие меры,
такие как блокировка или отбрасывание отправленных сообщений.

https://zeromq.org/socket-api/#high-water-mark

Значение по умолчанию для этого равно 1000, вы можете установить его, используя библиотеку pebble/zmq4:
https://pkg.go.dev/github.com/pebbe/zmq4#Socket.SetSndhwm

Верхний предел приводит к отбрасыванию сообщений

Когда сокет PUB попадает в состояние молчания из-за достижения верхнего
предела для подписчика, то любые сообщения, которые должны быть отправлены
данному подписчику, будут отброшены до тех пор, пока состояние молчания не закончится.

https://zeromq.org/socket-api/#pub-socket

Ваш подписчик будет получать сообщения, но не так быстро, как они производятся в паблишере. Это приводит к увеличению количества незавершенных. Похоже, что верхний предел достигается после получения около 3000 сообщений, затем происходит молчание до 1m40.

Я предполагаю, что вы не получаете все 10000 сообщений? Потому что средние 4000 отбрасываются паблишером.

Ответ или решение

Ваша проблема с получением всех сообщений в технологии PUB-SUB с помощью ZeroMQ на языке Golang может быть связана с несколькими аспектами реализации и конфигурации. Рассмотрим подробнее основные причины и возможные решения.

Высокое давление (High Water Mark)

Одна из наиболее вероятных причин, по которой ваш подписчик (SUB) не получает все сообщения, заключается в том, что вы достигаете предела высоких отметок (High Water Mark, HWM) на сокете PUB. По умолчанию HWM устанавливается в 1000 сообщений, и когда этот предел достигается, отправка сообщений приостанавливается до тех пор, пока подписчик не освободит место, что может привести к сбросу сообщений.

Решение:

Вы можете изменить значение HWM, используя метод SetSndhwm для сокета PUB. Увеличение этого значения позволит вашему PUB-сокету отправлять больше сообщений, прежде чем он перейдет в режим мьютирования. Например, установите HWM на 10,000:

publisher.SetSndhwm(10000)

Время ожидания для подписчиков

Также стоит отметить, что в вашей реализации перед отправкой сообщений у вас стоит задержка в 5 секунд для подключения подписчика. Убедитесь, что подписчик полностью подключен и готов к приему сообщений перед началом отправки.

Решение:

Необходимо обеспечить, чтобы подписчик был готов к приему сообщений и инициировали отправку сообщений только после подтверждения его подключения.

Настройка скорости отправки

Другая проблема может заключаться в том, что скорость отправки сообщений на стороне PUB превышает скорость обработки на стороне SUB. Из-за этого в очереди на сокете PUB накапливается большое количество сообщений, что также может привести к сбросу некоторых сообщений.

Решение:

Вы можете ввести небольшую задержку между отправкой сообщений. Например:

time.Sleep(time.Millisecond * 10) // Задержка 10 миллисекунд

Отладка и оценка производительности

Для адекватной оценки производительности вашей системы важно добавить детальный вывод отладки, который поможет понять, как именно происходят задержки. Можно вывести статистику о времени получения каждого сообщения и реакциях на ошибки.

if err != nil {
    fmt.Println("Receive failed", err)
}

Заключение

Существует несколько факторов, которые могут повлиять на то, что ваш подписчик не получает все сообщения. Наиболее распространенные из них связаны с высокой водой и скоростью обработки. Увеличение HWM, добавление задержек между отправкой сообщений и обеспечение готовности подписчика к приему сообщений могут существенно улучшить стабильность и производительность вашей системы PUB-SUB.

Если эти рекомендации не решат вашу проблему, может потребоваться более глубокий анализ кода и конфигурации сети. Убедитесь, что все компоненты вашей системы работают оптимально и могут обрабатывать ожидаемую нагрузку.

Оцените материал
Добавить комментарий

Капча загружается...