Вопрос или проблема
В настоящее время я работаю над системой обмена сообщениями, в которой у меня есть служба MassTransit на C# и слушатель Spring Boot. Я столкнулся с проблемой заголовков сообщений, отправляемых из MassTransit, в частности с заголовками reply_to
и MT-Response-Address
.
Когда сообщение отправляется из MassTransit, оно включает следующую структуру заголовков:
Структура сообщения MassTransit:
Свойства
- message_id: 10340000-afb3-7824-10a6-08dce792c869
- expiration: 30000
- delivery_mode: 2
- headers:
- ConversationId: 10340000-afb3-7824-6a92-08dce792c869
- MT-Host-Info: { "MachineName":"DESKTOP-KS55GT8",
"ProcessName":"UsersService",
"ProcessId":13328,
"Assembly":"UsersService",
"AssemblyVersion":"1.0.0.0",
"FrameworkVersion":"8.0.4",
"MassTransitVersion":"8.2.5.0",
"OperatingSystemVersion":"Microsoft Windows NT 10.0.19045.0"
}
- MT-MessageType: urn:message:UsersService.Messages:RequestMessage
- MT-Response-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
- MT-Source-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
- MessageId: 10340000-afb3-7824-10a6-08dce792c869
- RequestId: 10340000-afb3-7824-9af9-08dce792c868
- publishId: 2
- content_type: application/json
- Payload: - 31 bytes
- Encoding: string
- { "payload": "Test Value" }
- Redelivered: false
Заголовок MT-Response-Address
устанавливается MassTransit и содержит адрес ответа, но мой слушатель Spring Boot ожидает, что эта информация будет в заголовке reply_to
:
Ожидаемая структура сообщения Spring Boot:
Свойства
- reply_to: amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjM3MzY2MzIAAFNIAAAAAGb/7rs=.SzKpGjLslrBY1mMHuXMWhQ==
- correlation_id: 1
- priority: 0
- delivery_mode: 2
- headers:
- __TypeId__: com.api.stockservice.domain.event.RequestMessage
- content_encoding: UTF-8
- content_type: application/json
- Payload: - 49 bytes
- Encoding: string
- { "Payload": "etutrtjfjtf", "payload": "etutrtjfjtf" }
Несоответствие между MT-Response-Address
в MassTransit и ожидаемым reply_to
в Spring Boot вызывает ошибки при обработке сообщений.
Таким образом, Spring Boot, когда хочет ответить на сообщение, ожидает, что адрес ответа будет в reply-to
, но MassTransit, который является отправителем, вставляет адрес ответа в MT-Response-Address
.
using MassTransit;
using UsersService.Messages;
namespace UsersService.Producers
{
public class MyRequester
{
private readonly IRequestClient<RequestMessage> _requestClient;
public MyRequester(IRequestClient<RequestMessage> requestClient)
{
_requestClient = requestClient;
}
public async Task SendRequest(string value)
{
var request = new RequestMessage { Payload = value };
// Отправляем запрос и ожидаем ответа
var response = await _requestClient.GetResponse<ResponseMessage>(request);
Console.WriteLine($"Ответ: {response.Message.Result}");
}
}
}
Код запроса
builder.Services.AddMassTransit(x =>
{
//x.AddConsumer<RequestConsumer>();
x.AddConsumer<FirstEventConsumer>();
x.AddConsumer<SecondEventConsumer>();
x.AddRequestClient<RequestMessage>(new Uri("queue:my-request-queue"));
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.UseNewtonsoftRawJsonSerializer();
cfg.UseNewtonsoftJsonDeserializer();
cfg.ConfigureEndpoints(context);
//cfg.ReceiveEndpoint("my-request-queue", e =>
//{
// e.ConfigureConsumer<RequestConsumer>(context);
//});
cfg.ReceiveEndpoint("first-publish-queue", e =>
{
e.Bind("publish_exchange");
e.ConfigureConsumer<FirstEventConsumer>(context);
});
cfg.ReceiveEndpoint("second-publish-queue", e =>
{
e.Bind("publish_exchange");
e.ConfigureConsumer<SecondEventConsumer>(context);
});
});
});
Слушатель в Spring Boot
package com.api.stockservice.infrastructure.messaging;
import com.api.stockservice.domain.event.RequestMessage;
import com.api.stockservice.domain.event.ResponseMessage;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class RequestListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "my-request-queue")
public ResponseMessage handleRequest(@Payload RequestMessage request) {
// Обработка запроса
String result = "Обработано: " + request.getPayload();
// Создание ответа
return new ResponseMessage(result);
}
}
Вопросы
Как я могу сделать так, чтобы заголовки и свойства сообщений RabbitMQ от MassTransit соответствовали схемам, используемым другими системами, которые не используют MassTransit?
Ответ или решение
Для решения проблемы, связанной с несовпадением заголовков сообщений между MassTransit и Spring Boot, необходимо внести некоторые изменения в конфигурацию MassTransit, чтобы обеспечить соответствие ожидаемым заголовкам, содержащим информацию о адресе ответа.
Проблема
Когда сообщение отправляется из MassTransit, оно содержит заголовок MT-Response-Address
, который, по сути, указывает, куда должен быть отправлен ответ. Однако ваш Spring Boot слушатель ожидает, что эта информация будет находиться в заголовке reply_to
, что вызывает ошибки в обработке сообщений.
Решение
Чтобы решить эту проблему, следует настроить MassTransit так, чтобы он использовал заголовок reply_to
, а не MT-Response-Address
. Это можно сделать с помощью использования объекта, который эксплицитно добавляет этот заголовок в сообщения, отправляемые через MassTransit.
Вот шаги, которые помогут достичь этой цели:
-
Настройка заголовков в MassTransit: В вашем коде, где используется
IRequestClient
, добавьте код для установки заголовкаreply_to
, чтобы при отправке сообщения он был правильно установлен. -
Пример кода для настройки заголовков: Изменим метод
SendRequest
в классеMyRequester
, чтобы включить установку заголовкаreply_to
.
public async Task SendRequest(string value)
{
var request = new RequestMessage { Payload = value };
// Отправка запроса с указанием заголовка reply_to
var response = await _requestClient.GetResponse<ResponseMessage>(request,
ctx => ctx.Headers.Set("reply_to", "amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjM3MzY2MzIAAFNIAAAAAGb/7rs=.SzKpGjLslrBY1mMHuXMWhQ=="));
Console.WriteLine($"Response: {response.Message.Result}");
}
- Обработка ответов в слушателе Spring Boot: Убедитесь, что ваш слушатель правильно обрабатывает сообщения, получаемые из очереди. Когда он создает ответ, он может использовать
reply_to
, чтобы отправить ответ обратно в MassTransit.
В данном случае
- Убедитесь, что Spring Boot слушатель может правильно извлечь
reply_to
заголовок и использовать его для отправки ответа обратно в MassTransit. - Убедитесь, что идентификаторы сообщений и корреляции также правильно установлены и передаются в заголовках.
Дополнительные рекомендации
- Логирование: Включите детальное логирование в MassTransit и Spring Boot, чтобы отследить, какие заголовки фактически отправляются и получаются, что может помочь в дальнейшей отладке проблемы.
- Тестирование: Протестируйте конечный результат, чтобы убедиться, что сообщения обрабатываются корректно, как на стороне MassTransit, так и на стороне Spring Boot.
Заключение
Следуя выше приведенным рекомендациям, вы сможете решить проблему с заголовками сообщений между MassTransit и Spring Boot, обеспечив правильную настройку для передачи данных между двумя системами. Это должно помочь устранить ошибки, возникающие из-за несовпадения заголовков и упростить интеграцию между сервисами.