Почему сообщения теряются после восстановления потребителя в настройке издатель-потребитель RabbitMQ/MassTransit?

Вопрос или проблема

В моей настройке MassTransit + RabbitMQ у меня есть издатель, который как отправляет, так и подписывается на события, и отдельное приложение-потребитель. Когда потребитель не работает, а издатель отправляет и обрабатывает событие, сообщение не доставляется потребителю после его восстановления. Похоже, RabbitMQ считает сообщение уже обработанным, так как издатель его потребил.

Важно: Когда оба приложения работают, доставка сообщений происходит корректно, и оба приложения потребляют сообщения, как и ожидалось. Проблема возникает только в сценарии восстановления, описанном выше.

Ожидаемое поведение: Сообщения должны доставляться всем подписанным потребителям, даже если один из потребителей (в данном случае издатель) уже его обработал. Когда приложение-потребитель восстанавливается, оно должно получать сообщения, которые были опубликованы во время его простоя.

Конфигурация одинаковая для обоих приложений:

using System.Reflection;
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using SharedKernel.Communication.Events;
using SharedKernel.Utils;

namespace SharedKernel.Communication.Extensions;

public static class MassTransitServicesExtensions
{
    public static IServiceCollection AddMassTransit(
        this IServiceCollection services,
        IConfiguration config,
        Assembly consumersAssembly,
        string applicationName
    )
    {
        services.AddMassTransit(busConfigurator =>
        {
            busConfigurator.SetKebabCaseEndpointNameFormatter();

            busConfigurator.AddConsumers(consumersAssembly);

            busConfigurator.UsingRabbitMq(
                (context, configurator) =>
                {
                    string host = config["RabbitMq:Host"]!;
                    string username = config["RabbitMq:Username"]!;
                    string password = config["RabbitMq:Password"]!;
                    if (AppEnv.IsProduction)
                    {
                        host = Environment.GetEnvironmentVariable("RABBITMQ_HOST")!;
                        username = Environment.GetEnvironmentVariable("RABBITMQ_USER")!;
                        password = Environment.GetEnvironmentVariable("RABBITMQ_PASSWORD")!;
                    }

                    configurator.Host(
                        new Uri(host),
                        hostConfigurator =>
                        {
                            hostConfigurator.Username(username);
                            hostConfigurator.Password(password);
                        }
                    );

                    configurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(10)));

                    configurator.Message<UserConfirmedEmailEvent>(e =>
                    {
                        e.SetEntityName("user-confirmed-email-event");
                    });
                    configurator.Publish<UserConfirmedEmailEvent>(e =>
                    {
                        e.ExchangeType = "fanout";
                        e.Durable = true;
                    });

                    configurator.ReceiveEndpoint(
                        $"user-confirmed-email-{applicationName}",
                        e =>
                        {
                            e.Durable = true;
                            e.AutoDelete = false;

                            e.ConfigureConsumers(context);

                            e.Bind(
                                "user-confirmed-email-event",
                                b =>
                                {
                                    b.ExchangeType = "fanout";
                                    b.Durable = true;
                                }
                            );
                        }
                    );
                }
            );
        });

        return services;
    }
}

Ответ или решение

Потеря сообщений после восстановления потребителя в настройке MassTransit и RabbitMQ: причина и решение

Когда мы говорим о системах обмена сообщениями, таких как RabbitMQ в сочетании с MassTransit, критически важно понять, как корректно обрабатывать ситуации, когда потребитель временно недоступен. В рассматриваемом вами случае мы имеем дело с настройкой, в которой один из приложений (в данном случае, публикация и потребление происходят в одном и том же приложении) не получает сообщения после того, как он восстановился после сбоя. В этой статье мы разберем причины этого поведения и предложим возможные решения.

Понимание поведения RabbitMQ и MassTransit

Когда ваш публицист отправляет сообщения в RabbitMQ, он фактически публикует их в определённые очереди, откуда потребители могут их забрать. В вашем случае вы используете fanout обменник для UserConfirmedEmailEvent, что означает, что каждое отправленное сообщение будет дублироваться на все связанные с ним очереди. Это идеальная конфигурация для создания события, когда несколько потребителей должны обрабатывать одно и то же сообщение. Однако, есть несколько моментов, на которые стоит обратить внимание:

  1. Проблема с обработкой в одном приложении: Когда ваше приложение функционирует как публицист и как потребитель одновременно, оно должно правильно обрабатывать свои состояния. Если публицист "потребляет" сообщение, то есть обрабатывает его, RabbitMQ считает это сообщение обработанным и, следовательно, больше не отправляет его другим потребителям. В вашем случае, когда конечный потребитель снова поднимается, он не получает сообщения, поскольку публикация и потребление происходят в одном приложении, и RabbitMQ это учитывает.

  2. Durable Exchange и Queues: Убедитесь, что вы используете durable обменники и очереди. Это гарантирует, что ваши сообщения не будут потеряны в случае сбоя RabbitMQ. Ваша текущая настройка указана правильно, однако, если сообщения были "съедены" во время сбоев, они могут быть не видны для вновь запущенного потребителя.

  3. Поведение в ситуации сбоя: Когда ваш публицист не работает, и он отправляет сообщения, они должны оставаться в очереди до тех пор, пока они не будут подтверждены всеми потребителями. Однако, в случае вашего утверждения, что публицист также является потребителем, RabbitMQ будет считать, что сообщение уже обработано.

Решения для обеспечения доставки сообщений после восстановления

Чтобы устранить эту проблему, рассмотрите следующие рекомендации:

  • Разделение приложений: Самый очевидный шаг — это выделить отдельные приложения для публикации и потребления сообщений. Это позволит избежать ситуации, когда сообщение считается обработанным только потому, что оно было обработано одним и тем же экземпляром приложения.

  • Использование механизма подтверждения сообщений: Убедитесь, что ваш консюмер правильно подтверждает (ack) сообщения только тогда, когда они успешно обработаны. Это поможет избежать ситуации, когда RabbitMQ считает сообщение обработанным, хотя еще не все подписчики его получили.

  • Проверка состояния потребителя: Реализуйте логику в вашем потребителе, которая будет проверять, есть ли сообщения в очереди перед тем, как начинать обработку. Если потребитель был выключен, то он должен вызывать логику, которая будет позволять повторно получать сообщения.

  • Настройка времени жизни сообщений: Убедитесь, что сообщения имеют заданный TTL (время жизни), чтобы они не "застревали" в очередях слишком долго, если потребитель не доступен.

Заключение

Проблема потери сообщений после восстановления потребителей в системе RabbitMQ и MassTransit может быть разрешена путём изменения архитектурного подхода к проектированию системы. В частности, стоит постараться разделить responsabilidad за публикацию и потребление. Также не забывайте о правильной конфигурации очередей и обменников, а также об эффективных механизмах обработки и подтверждения сообщений. Подобные изменения значительно повысят надежность вашей системы обмена сообщениями и обеспечат слаженное взаимодействие всех компонентов.

Оцените материал
Добавить комментарий

Капча загружается...