.NET에서 Docker와 RabbitMQ 헤더 교환

https://eduardstefanescu.dev/2020/03/28/rabbitmq-headers-exchange-with-docker-in-dotnet/에 원래 게시됨


이 글은 Headers Exchange 에 관한 글입니다. 다른 세 가지 유형(예: 팬아웃, 다이렉트, 토픽)에 비해 조금 더 주의를 기울여야 하기 때문에 이 유형Exchange에 대해서만 글을 작성했습니다.\
이 문서에서는 첫 번째 문서의 Docker를 사용하여 설정 환경을 사용하므로 좋은 시작점을 원하는 경우 첫 번째 문서(https://stefanescueduard.github.io/2020/02/29/rabbitmq-producer-with-docker-in-dotnet/)부터 시작할 수 있습니다.

소개

Headers Exchange  is routing the messages based on the bindings that are applied to the  Producer  and  Queue .\
In order to produce a message, that message should be published with defined properties that are also bound to the  Queue  - this is why it's called  Headers Exchange  - and can also be seen as the  routing key .\
Another mandatory property that must be bound to the  Queue  is  x-match . This property specifies the matching criteria, as follows:

  • x-match=all  means that all the header pairs must match;
  • x-match=any  means that at least one header pairs must match;

A quick example, that will be also implemented using .NET Core:\
Let's say that a Windows Application produces two logging types, information and error, and the server process these log messages based on its types.\
So for each log type, will be a  Queue  that will have a property  log-level  equals to the type of logging that will receive. To send log information, the  Producer  must set to that message a property  log-level=information  so that the  Exchange  will forward the message to the correct  Queue .

If this is still unclear, then a brief explanation for  Headers Exchange  is that the messages contain a header in order to be bound correctly.\
I hope now the whole typology is a little bit clearer, and why this type of  Exchange  is called  Headers .

Now we can start implementing these concepts, the following part will contain code chunks that are required in order to use  Headers Exchange , these chunks were applied to existing code from the previous four articles about RabbitMQ.

생산자

To bind the logging type to the  Producer  properties, a  Dictionary  of type  string, object  is used as the properties headers. In this case, the  key  will be  log-level  and the  value  will be the actual log level (i.e. information or error). For this scenario, the log level is entered by the user, but in a real-life scenario, there will be a logging system that will serve this scope.

Console.Write("Log level: ");
string logLevel = Console.ReadLine();

Console.Write("Message: ");
string message = Console.ReadLine();

var propertiesHeaders = new Dictionary<string, object> { { "log-level", logLevel } };

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        IBasicProperties properties = channel.CreateBasicProperties();
        properties.Headers = propertiesHeaders;

        byte[] messageBytes = Encoding.UTF8.GetBytes(message);
        const string exchangeName = "test-exchange";

        channel.BasicPublish(exchangeName, string.Empty, properties, messageBytes);
        Console.WriteLine($"Published message: {message}");
    }
}

The  propertiesHeaders  are bound to the published message by firstly creating plain properties with an empty content header using the  CreateBasicProperties  method. The difference between this  Producer  and the one from the first article is on line 14, which sets the  Headers  property to the dictionary created earlier.

교환

This type of  Exchange  is very similar to the one created in the second article. The only thing that changes is the type, which will be set to  headers  when the  Exchange  is declared on line 7.

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        const string exchangeName = "test-exchange";
        channel.ExchangeDelete(exchange: exchangeName);
        channel.ExchangeDeclare(exchange: exchangeName, type: "headers");
    }
}

We can see this result also on the RabbitMQ Management page:



대기줄

To bound the  Queue , the properties of the header must be set using a  Dictionary  of type  string, string  that will be passed as arguments to the  QueueBind  method.\
For this article, I chose to use the  all  value for the first  x-match  property, but this value can be also set to  any  because only the  log-level  header needs to match.\
And the second property is the  log-level , the value of this property is given by the user, but in a real-life scenario, this property will be set by the listener service.

Console.Write($"Log level for Queue[{queueIndex}]: ");
var logLevel = Console.ReadLine();

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDelete(queue: queueName);

        channel.QueueDeclare(queue: queueName,
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        channel.QueueBind(queue: queueName,
            exchange: "test-exchange",
            routingKey: string.Empty,
            arguments: new Dictionary<string, object>
                {{"x-match", "all"}, {"log-level", logLevel}});
    }
}

When a  Queue  is created the result can be also seen on the RabbitMQ Management page. In the picture below, there is an  information-queue  bound with  log-level  set to  information  using the code above.



결과

With all of this in place, we can start using this topology.

Firstly, we have to create a  Headers Exchange :


informationerror 대기열을 만들고 log-level 속성을 여기에 바인딩해야 합니다.



지난 기사의 Consumer 가 사용되며 이벤트가 없는 것도 사용되지만 이벤트가 있는 것도 사용할 수 있습니다. 다른 Consumer을 생성할 필요가 없습니다. 그 책임은 구독 중인 Queues를 듣는 것이기 때문입니다.

이제 메시지를 생성할 수 있습니다. informationerror 로그 수준에 대한 메시지 두 개와 헤더에 바인딩되지 않은 메시지가 있습니다.



위의 그림에서 알 수 있듯이 헤더가 올바르게 바인딩된 처음 두 메시지는 소비자가 수신했지만 헤더가 없는 마지막 메시지는 수신되지 않았습니다.

이 토폴로지 및 RabbitMQ 시리즈의 전체 코드는 내 GitHub 계정https://github.com/StefanescuEduard/RabbitMQ_POC에서 찾을 수 있습니다.

이 기사를 읽어 주셔서 감사합니다. 흥미로웠다면 동료 및 친구들과 공유하십시오. 또는 개선할 수 있는 부분이 있으면 알려주세요.

좋은 웹페이지 즐겨찾기