MassTransit с RabbitMQ и Spring Boot: Как согласовать обработку метаданных сообщений для паттерна запрос-ответ?

Вопрос или проблема

Я работаю над интеграцией MassTransit с RabbitMQ в приложении .NET и мне нужно реализовать паттерн запрос-ответ с слушателем Spring Boot. Однако, похоже, что существует разница в том, как MassTransit и Spring Boot обрабатывают метаданные сообщений.

Структура сообщения MassTransit:

Свойства
 - message_id: 10340000-afb3-7824-10a6-08dce792c869 
 - expiration: 30000
 - delivery_mode: 2
 - headers:
 - ConversationId: 10340000-afb3-7824-6a92-08dce792c869
 - MT-Host-Info: { "MachineName":"DESKTOP-KS55GT8",
 "ProcessName":"UsersService",
 "ProcessId":13328,
 "Assembly":"UsersService",
 "AssemblyVersion":"1.0.0.0",
 "FrameworkVersion":"8.0.4",
 "MassTransitVersion":"8.2.5.0",
 "OperatingSystemVersion":"Microsoft Windows NT 10.0.19045.0"
 }
 - MT-MessageType: urn:message:UsersService.Messages:RequestMessage
 - MT-Response-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
 - MT-Source-Address: rabbitmq://localhost/DESKTOPKS55GT8_UsersService_bus_ny4yyyfxsphnjwh1bdqqxrmedz?temporary=true
 - MessageId: 10340000-afb3-7824-10a6-08dce792c869
 - RequestId: 10340000-afb3-7824-9af9-08dce792c868
 - publishId: 2
 - content_type: application/json
 - Payload: - 31 bytes
 - Encoding: string
 - { "payload": "Test Value" }
 - Redelivered: false

Заголовок MT-Response-Address устанавливается MassTransit и содержит адрес ответа, но мой слушатель Spring Boot ожидает, что эта информация будет в заголовке reply_to:

Ожидаемая структура сообщения Spring Boot:

Свойства
 - reply_to: amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjM3MzY2MzIAAFNIAAAAAGb/7rs=.SzKpGjLslrBY1mMHuXMWhQ==
 - correlation_id: 1
 - priority: 0
 - delivery_mode: 2
 - headers:
     - __TypeId__: com.api.stockservice.domain.event.RequestMessage
 - content_encoding: UTF-8
 - content_type: application/json
 - Payload: - 49 bytes
 - Encoding: string
 - { "Payload": "etutrtjfjtf", "payload": "etutrtjfjtf" }

это слушатель springboot

package com.api.stockservice.infrastructure.messaging;

import com.api.stockservice.domain.event.RequestMessage;
import com.api.stockservice.domain.event.ResponseMessage;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class RequestListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "my-request-queue")
    @SendTo("request.messageProperties.header['MT-Response-Address']")
    public ResponseMessage handleRequest(@Payload RequestMessage request) {
        // Обработка запроса
        String result = "Processed: " + request.getPayload();
        System.out.println("Получен результат: " + result);
        // Создание ответа
        return new ResponseMessage(result);
    }
}

это запросчик masstransit

using MassTransit;
using UsersService.Messages;

namespace UsersService.Producers
{
    public class MyRequester
    {
        private readonly IRequestClient<RequestMessage> _requestClient;

        public MyRequester(IRequestClient<RequestMessage> requestClient)
        {
            _requestClient = requestClient;
        }

        public async Task SendRequest(string value)
        {
            var request = new RequestMessage { Payload = value };

            // Отправка запроса и ожидание ответа
            var response = await _requestClient.GetResponse<ResponseMessage>(request);

            Console.WriteLine($"Ответ: {response.Message.Result}");
        }
    }
}

это конфигурация masstransit в program.cs

builder.Services.AddMassTransit(x =>
{
    //x.AddConsumer<RequestConsumer>();
    x.AddConsumer<FirstEventConsumer>();

    x.AddConsumer<SecondEventConsumer>();
    x.AddRequestClient<RequestMessage>(new Uri("queue:my-request-queue"));
    x.UsingRabbitMq((context, cfg) =>
    {
     
        cfg.Host("rabbitmq://localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.UseNewtonsoftRawJsonSerializer();
        cfg.UseNewtonsoftJsonDeserializer();
        
        cfg.ConfigureEndpoints(context);
        //cfg.ReceiveEndpoint("my-request-queue", e =>
        //{
        //    e.ConfigureConsumer<RequestConsumer>(context);
        //});
        cfg.ReceiveEndpoint("first-publish-queue", e =>
        {
            e.Bind("publish_exchange");
            e.ConfigureConsumer<FirstEventConsumer>(context);
        });
        cfg.ReceiveEndpoint("second-publish-queue", e =>
        {
            e.Bind("publish_exchange");
            e.ConfigureConsumer<SecondEventConsumer>(context);
        });
    });
});

builder.Services.AddMassTransitHostedService();

Вопросы

Есть ли способ настроить MassTransit так, чтобы использовать свойства вместо заголовков при отправке сообщений в RabbitMQ, чтобы формат сообщения соответствовал тому, как его обрабатывает Spring Boot? По сути, мне нужно, чтобы MassTransit следовал более стандартному механизму передачи сообщений в терминах обработки метаданных.
Таким образом, MassTransit может бесшовно отправлять и отвечать другим системам, которые используют тот же подход к обработке метаданных, что и springboot.

Ответ или решение

Для того чтобы обеспечить согласование обработки метаданных сообщений между MassTransit и Spring Boot при использовании шаблона "запрос-ответ", необходимо адаптировать подходы к передаче метаданных. MassTransit по умолчанию использует заголовки (headers) для хранения метаданных сообщений, тогда как Spring Boot ожидает, что аналогичная информация будет доступна через свойства (properties) сообщений, такие как reply_to и correlation_id.

1. Конфигурация MassTransit

MassTransit настраивается для использования ваших собственных заголовков, что можно сделать методом UseMessageRetry, а также через перехватчики (middleware). Однако для достижения желаемого результата вам необходимо явно управлять заголовками, которые будут использоваться в RabbitMQ.

Пример настройки MassTransit

Вот пример того, как вы можете модифицировать вашу конфигурацию MassTransit для передачи необходимых метаданных в свойстве reply_to:

using MassTransit;
using UsersService.Messages;

namespace UsersService.Producers
{
    public class MyRequester
    {
        private readonly IRequestClient<RequestMessage> _requestClient;

        public MyRequester(IRequestClient<RequestMessage> requestClient)
        {
            _requestClient = requestClient;
        }

        public async Task SendRequest(string value)
        {
            var request = new RequestMessage { Payload = value };
            var response = await _requestClient.GetResponse<ResponseMessage>(request, x =>
            {
                x.Headers.Set("reply_to", "amq.rabbitmq.reply-to"); // Настройка заголовка reply_to
                x.Headers.Set("correlation_id", Guid.NewGuid().ToString()); // Установка correlation_id
            });

            Console.WriteLine($"Response: {response.Message.Result}");
        }
    }
}

2. Настройка Spring Boot

В Spring Boot ваш слушатель должен быть настроен для обработки приходит запросов и отправки ответов обратно на заданный адрес. Ваша реализация должна учитывать, что reply_to будет отправляться обратно в MassTransit.

Пример реализации слушателя Spring Boot

Пример слушателя с использованием RabbitTemplate может выглядеть так:

@Component
public class RequestListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "my-request-queue")
    public ResponseMessage handleRequest(@Payload RequestMessage request, @Header("reply_to") String replyTo, @Header("correlation_id") String correlationId) {
        // Обработка запроса
        String result = "Processed: " + request.getPayload();
        System.out.println("Received result: " + result);

        // Отправка ответа
        ResponseMessage response = new ResponseMessage(result);
        rabbitTemplate.convertAndSend(replyTo, response, message -> {
            message.getMessageProperties().setHeader("correlation_id", correlationId); // установка correlation_id в заголовок
            return message;
        });

        return response;
    }
}

3. Устранение разногласий

Теперь у вас есть настроенные обработчики в MassTransit и Spring Boot, которые учитывают необходимые заголовки и свойства. Это позволит MassTransit отправлять сообщения с учетом ожиданий Spring Boot, обеспечивая безболезненный обмен сообщениями.

Заключение

Хотя MassTransit и Spring Boot используют разные подходы для работы с метаданными, настройка заголовков и обработка свойств может быть адаптирована, чтобы устранить возможные проблемы. Уметь управлять заголовками в MassTransit и их соответствием с ожидаемыми в Spring Boot – ключевой момент в интеграции двух технологий. При правильной настройке вы сможете обеспечить надежное взаимодействие между ними в рамках шаблона "запрос-ответ".

Оцените материал
Добавить комментарий

Капча загружается...