Вопрос или проблема
В моей настройке MassTransit + RabbitMQ у меня есть издатель, который как отправляет, так и подписывается на события, и отдельное приложение-потребитель. Когда потребитель не работает, а издатель отправляет и обрабатывает событие, сообщение не доставляется потребителю после его восстановления. Похоже, RabbitMQ считает сообщение уже обработанным, так как издатель его потребил.
Важно: Когда оба приложения работают, доставка сообщений происходит корректно, и оба приложения потребляют сообщения, как и ожидалось. Проблема возникает только в сценарии восстановления, описанном выше.
Ожидаемое поведение: Сообщения должны доставляться всем подписанным потребителям, даже если один из потребителей (в данном случае издатель) уже его обработал. Когда приложение-потребитель восстанавливается, оно должно получать сообщения, которые были опубликованы во время его простоя.
Конфигурация одинаковая для обоих приложений:
using System.Reflection;
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using SharedKernel.Communication.Events;
using SharedKernel.Utils;
namespace SharedKernel.Communication.Extensions;
public static class MassTransitServicesExtensions
{
public static IServiceCollection AddMassTransit(
this IServiceCollection services,
IConfiguration config,
Assembly consumersAssembly,
string applicationName
)
{
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumers(consumersAssembly);
busConfigurator.UsingRabbitMq(
(context, configurator) =>
{
string host = config["RabbitMq:Host"]!;
string username = config["RabbitMq:Username"]!;
string password = config["RabbitMq:Password"]!;
if (AppEnv.IsProduction)
{
host = Environment.GetEnvironmentVariable("RABBITMQ_HOST")!;
username = Environment.GetEnvironmentVariable("RABBITMQ_USER")!;
password = Environment.GetEnvironmentVariable("RABBITMQ_PASSWORD")!;
}
configurator.Host(
new Uri(host),
hostConfigurator =>
{
hostConfigurator.Username(username);
hostConfigurator.Password(password);
}
);
configurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(10)));
configurator.Message<UserConfirmedEmailEvent>(e =>
{
e.SetEntityName("user-confirmed-email-event");
});
configurator.Publish<UserConfirmedEmailEvent>(e =>
{
e.ExchangeType = "fanout";
e.Durable = true;
});
configurator.ReceiveEndpoint(
$"user-confirmed-email-{applicationName}",
e =>
{
e.Durable = true;
e.AutoDelete = false;
e.ConfigureConsumers(context);
e.Bind(
"user-confirmed-email-event",
b =>
{
b.ExchangeType = "fanout";
b.Durable = true;
}
);
}
);
}
);
});
return services;
}
}
Ответ или решение
Потеря сообщений после восстановления потребителя в настройке MassTransit и RabbitMQ: причина и решение
Когда мы говорим о системах обмена сообщениями, таких как RabbitMQ в сочетании с MassTransit, критически важно понять, как корректно обрабатывать ситуации, когда потребитель временно недоступен. В рассматриваемом вами случае мы имеем дело с настройкой, в которой один из приложений (в данном случае, публикация и потребление происходят в одном и том же приложении) не получает сообщения после того, как он восстановился после сбоя. В этой статье мы разберем причины этого поведения и предложим возможные решения.
Понимание поведения RabbitMQ и MassTransit
Когда ваш публицист отправляет сообщения в RabbitMQ, он фактически публикует их в определённые очереди, откуда потребители могут их забрать. В вашем случае вы используете fanout
обменник для UserConfirmedEmailEvent
, что означает, что каждое отправленное сообщение будет дублироваться на все связанные с ним очереди. Это идеальная конфигурация для создания события, когда несколько потребителей должны обрабатывать одно и то же сообщение. Однако, есть несколько моментов, на которые стоит обратить внимание:
-
Проблема с обработкой в одном приложении: Когда ваше приложение функционирует как публицист и как потребитель одновременно, оно должно правильно обрабатывать свои состояния. Если публицист "потребляет" сообщение, то есть обрабатывает его, RabbitMQ считает это сообщение обработанным и, следовательно, больше не отправляет его другим потребителям. В вашем случае, когда конечный потребитель снова поднимается, он не получает сообщения, поскольку публикация и потребление происходят в одном приложении, и RabbitMQ это учитывает.
-
Durable Exchange и Queues: Убедитесь, что вы используете
durable
обменники и очереди. Это гарантирует, что ваши сообщения не будут потеряны в случае сбоя RabbitMQ. Ваша текущая настройка указана правильно, однако, если сообщения были "съедены" во время сбоев, они могут быть не видны для вновь запущенного потребителя. -
Поведение в ситуации сбоя: Когда ваш публицист не работает, и он отправляет сообщения, они должны оставаться в очереди до тех пор, пока они не будут подтверждены всеми потребителями. Однако, в случае вашего утверждения, что публицист также является потребителем, RabbitMQ будет считать, что сообщение уже обработано.
Решения для обеспечения доставки сообщений после восстановления
Чтобы устранить эту проблему, рассмотрите следующие рекомендации:
-
Разделение приложений: Самый очевидный шаг — это выделить отдельные приложения для публикации и потребления сообщений. Это позволит избежать ситуации, когда сообщение считается обработанным только потому, что оно было обработано одним и тем же экземпляром приложения.
-
Использование механизма подтверждения сообщений: Убедитесь, что ваш консюмер правильно подтверждает (ack) сообщения только тогда, когда они успешно обработаны. Это поможет избежать ситуации, когда RabbitMQ считает сообщение обработанным, хотя еще не все подписчики его получили.
-
Проверка состояния потребителя: Реализуйте логику в вашем потребителе, которая будет проверять, есть ли сообщения в очереди перед тем, как начинать обработку. Если потребитель был выключен, то он должен вызывать логику, которая будет позволять повторно получать сообщения.
-
Настройка времени жизни сообщений: Убедитесь, что сообщения имеют заданный TTL (время жизни), чтобы они не "застревали" в очередях слишком долго, если потребитель не доступен.
Заключение
Проблема потери сообщений после восстановления потребителей в системе RabbitMQ и MassTransit может быть разрешена путём изменения архитектурного подхода к проектированию системы. В частности, стоит постараться разделить responsabilidad за публикацию и потребление. Также не забывайте о правильной конфигурации очередей и обменников, а также об эффективных механизмах обработки и подтверждения сообщений. Подобные изменения значительно повысят надежность вашей системы обмена сообщениями и обеспечат слаженное взаимодействие всех компонентов.