Различия в заголовках сообщений MassTransit и Spring Boot, вызывающие проблемы

Вопрос или проблема

В настоящее время я работаю над системой обмена сообщениями, в которой у меня есть служба MassTransit на C# и слушатель Spring Boot. Я столкнулся с проблемой заголовков сообщений, отправляемых из MassTransit, в частности с заголовками reply_to и MT-Response-Address.

Когда сообщение отправляется из MassTransit, оно включает следующую структуру заголовков:

Структура сообщения MassTransit:

Свойства
 - message_id: 10340000-afb3-7824-10a6-08dce792c869 
 - expiration: 30000
 - delivery_mode: 2
 - headers:
 - ConversationId: 10340000-afb3-7824-6a92-08dce792c869
 - MT-Host-Info: { "MachineName":"DESKTOP-KS55GT8",
 "ProcessName":"UsersService",
 "ProcessId":13328,
 "Assembly":"UsersService",
 "AssemblyVersion":"1.0.0.0",
 "FrameworkVersion":"8.0.4",
 "MassTransitVersion":"8.2.5.0",
 "OperatingSystemVersion":"Microsoft Windows NT 10.0.19045.0"
 }
 - MT-MessageType: urn:message:UsersService.Messages:RequestMessage
 - MT-Response-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
 - MT-Source-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
 - MessageId: 10340000-afb3-7824-10a6-08dce792c869
 - RequestId: 10340000-afb3-7824-9af9-08dce792c868
 - publishId: 2
 - content_type: application/json
 - Payload: - 31 bytes
 - Encoding: string
 - { "payload": "Test Value" }
 - Redelivered: false

Заголовок MT-Response-Address устанавливается MassTransit и содержит адрес ответа, но мой слушатель Spring Boot ожидает, что эта информация будет в заголовке reply_to:

Ожидаемая структура сообщения Spring Boot:

Свойства
 - reply_to: amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjM3MzY2MzIAAFNIAAAAAGb/7rs=.SzKpGjLslrBY1mMHuXMWhQ==
 - correlation_id: 1
 - priority: 0
 - delivery_mode: 2
 - headers:
     - __TypeId__: com.api.stockservice.domain.event.RequestMessage
 - content_encoding: UTF-8
 - content_type: application/json
 - Payload: - 49 bytes
 - Encoding: string
 - { "Payload": "etutrtjfjtf", "payload": "etutrtjfjtf" }

Несоответствие между MT-Response-Address в MassTransit и ожидаемым reply_to в Spring Boot вызывает ошибки при обработке сообщений.
Таким образом, Spring Boot, когда хочет ответить на сообщение, ожидает, что адрес ответа будет в reply-to, но MassTransit, который является отправителем, вставляет адрес ответа в MT-Response-Address.

using MassTransit;
using UsersService.Messages;

namespace UsersService.Producers
{
    public class MyRequester
    {
        private readonly IRequestClient<RequestMessage> _requestClient;

        public MyRequester(IRequestClient<RequestMessage> requestClient)
        {
            _requestClient = requestClient;
        }

        public async Task SendRequest(string value)
        {
            var request = new RequestMessage { Payload = value };

            // Отправляем запрос и ожидаем ответа
            var response = await _requestClient.GetResponse<ResponseMessage>(request);

            Console.WriteLine($"Ответ: {response.Message.Result}");
        }
    }

}

Код запроса

builder.Services.AddMassTransit(x =>
{
    //x.AddConsumer<RequestConsumer>();
    x.AddConsumer<FirstEventConsumer>();

    x.AddConsumer<SecondEventConsumer>();
    x.AddRequestClient<RequestMessage>(new Uri("queue:my-request-queue"));
    x.UsingRabbitMq((context, cfg) =>
    {
     
        cfg.Host("rabbitmq://localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.UseNewtonsoftRawJsonSerializer();
        cfg.UseNewtonsoftJsonDeserializer();
        
        cfg.ConfigureEndpoints(context);
        //cfg.ReceiveEndpoint("my-request-queue", e =>
        //{
        //    e.ConfigureConsumer<RequestConsumer>(context);
        //});
        cfg.ReceiveEndpoint("first-publish-queue", e =>
        {
            e.Bind("publish_exchange");
            e.ConfigureConsumer<FirstEventConsumer>(context);
        });
        cfg.ReceiveEndpoint("second-publish-queue", e =>
        {
            e.Bind("publish_exchange");
            e.ConfigureConsumer<SecondEventConsumer>(context);
        });
    });
});

Слушатель в Spring Boot

package com.api.stockservice.infrastructure.messaging;

import com.api.stockservice.domain.event.RequestMessage;
import com.api.stockservice.domain.event.ResponseMessage;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class RequestListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "my-request-queue")
    public ResponseMessage handleRequest(@Payload RequestMessage request) {
        // Обработка запроса
        String result = "Обработано: " + request.getPayload();

        // Создание ответа
        return new ResponseMessage(result);
    }
}

Вопросы
Как я могу сделать так, чтобы заголовки и свойства сообщений RabbitMQ от MassTransit соответствовали схемам, используемым другими системами, которые не используют MassTransit?

Ответ или решение

Для решения проблемы, связанной с несовпадением заголовков сообщений между MassTransit и Spring Boot, необходимо внести некоторые изменения в конфигурацию MassTransit, чтобы обеспечить соответствие ожидаемым заголовкам, содержащим информацию о адресе ответа.

Проблема

Когда сообщение отправляется из MassTransit, оно содержит заголовок MT-Response-Address, который, по сути, указывает, куда должен быть отправлен ответ. Однако ваш Spring Boot слушатель ожидает, что эта информация будет находиться в заголовке reply_to, что вызывает ошибки в обработке сообщений.

Решение

Чтобы решить эту проблему, следует настроить MassTransit так, чтобы он использовал заголовок reply_to, а не MT-Response-Address. Это можно сделать с помощью использования объекта, который эксплицитно добавляет этот заголовок в сообщения, отправляемые через MassTransit.

Вот шаги, которые помогут достичь этой цели:

  1. Настройка заголовков в MassTransit: В вашем коде, где используется IRequestClient, добавьте код для установки заголовка reply_to, чтобы при отправке сообщения он был правильно установлен.

  2. Пример кода для настройки заголовков: Изменим метод SendRequest в классе MyRequester, чтобы включить установку заголовка reply_to.

public async Task SendRequest(string value)
{
    var request = new RequestMessage { Payload = value };

    // Отправка запроса с указанием заголовка reply_to
    var response = await _requestClient.GetResponse<ResponseMessage>(request, 
        ctx => ctx.Headers.Set("reply_to", "amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjM3MzY2MzIAAFNIAAAAAGb/7rs=.SzKpGjLslrBY1mMHuXMWhQ=="));

    Console.WriteLine($"Response: {response.Message.Result}");
}
  1. Обработка ответов в слушателе Spring Boot: Убедитесь, что ваш слушатель правильно обрабатывает сообщения, получаемые из очереди. Когда он создает ответ, он может использовать reply_to, чтобы отправить ответ обратно в MassTransit.

В данном случае

  • Убедитесь, что Spring Boot слушатель может правильно извлечь reply_to заголовок и использовать его для отправки ответа обратно в MassTransit.
  • Убедитесь, что идентификаторы сообщений и корреляции также правильно установлены и передаются в заголовках.

Дополнительные рекомендации

  • Логирование: Включите детальное логирование в MassTransit и Spring Boot, чтобы отследить, какие заголовки фактически отправляются и получаются, что может помочь в дальнейшей отладке проблемы.
  • Тестирование: Протестируйте конечный результат, чтобы убедиться, что сообщения обрабатываются корректно, как на стороне MassTransit, так и на стороне Spring Boot.

Заключение

Следуя выше приведенным рекомендациям, вы сможете решить проблему с заголовками сообщений между MassTransit и Spring Boot, обеспечив правильную настройку для передачи данных между двумя системами. Это должно помочь устранить ошибки, возникающие из-за несовпадения заголовков и упростить интеграцию между сервисами.

Оцените материал
Добавить комментарий

Капча загружается...