Эта статья была перенесена на github :https://github.com/whuanle/learnrabbitmq Если макет статьи неудобен для чтения, вы можете скачать оригинальную версию со склада. markdown Чтение файлов.
RabbitMQ — это очередь сообщений, реализующая протокол AMQP, который определен как открытый стандартный протокол прикладного уровня в качестве промежуточного программного обеспечения для обмена сообщениями. Он означает расширенный протокол очереди сообщений и имеет такие функции, как позиционирование сообщений, маршрутизация, организация очередей, безопасность и надежность.
В настоящее время наиболее популярные очереди сообщений в сообществе включают kafka, ActiveMQ, Pulsar, RabbitMQ, RocketMQ и т. д.
Автор также написал серия Kafka Учебник,Добро пожаловать для чтения:https://kafka.whuanle.cn/
Преимуществами и возможностями использования RabbitMQ, вероятно, являются высокая надежность, гибкая настройка правил маршрутизации, поддержка распределенного развертывания, соответствие протоколу AMQP и т. д. Его можно использовать для асинхронной связи, сбора журналов (Kafka лучше подходит для сбора журналов), системы архитектуры, управляемой событиями, развязки связи приложений и т. д.
Особенности версии сообщества RabbitMQ следующие:
Базовые объекты RabbitMQ имеют следующие моменты, но читателям сейчас не нужно их запоминать. В следующих главах автор представит их один за другим.
Технические знания RabbitMQ грубо делятся на:
Из-за ограниченности навыков автора и ограниченности места в этой статье объясняются только технические детали, связанные с программированием на C#, чтобы понять навыки кодирования и механизм работы RabbitMQ.
Читатели могут RabbitMQ Найдите его в официальной документации.приезжатьвесьиз Установить Учебник:https://www.rabbitmq.com/download.html
В этой статье для развертывания используется Docker.
RabbitMQ Список зеркал сообщества:https://hub.docker.com/_/rabbitmq
Создайте каталог для сопоставления томов хранения:
mkdir -p /opt/lib/rabbitmq
Разверните контейнер:
docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /opt/lib/rabbitmq:/var/lib/rabbitmq \
rabbitmq:3.12.8-management
Во время развертывания два порта заняты. 5672 — это порт связи MQ, а 15672 — это порт инструмента пользовательского интерфейса управления.
Откройте порт 15672, и вы попадете на страницу входа в Интернет. Учетная запись и пароль по умолчанию — гостевые.
Что касается использования пользовательского интерфейса управления RabbitMQ, мы представим его позже.
Открыв интерфейс управления, вы увидите Exchanges
В меню вы можете увидеть таблицу, как показано ниже. Это обменники по умолчанию. Вам не нужно знать эти вещи сейчас, они будут представлены позже.
Virtual host | Name | Type | Features |
---|---|---|---|
/ | (AMQP default) | direct | D |
/ | amq.direct | direct | D |
/ | amq.fanout | fanout | D |
/ | amq.headers | headers | D |
/ | amq.match | headers | D |
/ | amq.rabbitmq.trace | topic | D I |
/ | amq.topic | topic | D |
использовать C# развивать RabbitMQ,нуждатьсяиспользовать nuget представлять RabbitMQ.Client,Адрес документа официального сайта:.NET/C# RabbitMQ Client Library — RabbitMQ
Прежде чем продолжить работу со статьей, создайте консольную программу.
Чтобы облегчить понимание, в этой статье приведены десятки изображений, позволяющих прийти к согласию относительно значения некоторых графических изображений:
В соответствии с производителем, использование представляется следующим образом:
дляпотребитель,использовать Как показано на рисунке ниже:
дляинформацияочередь,использовать Как показано на рисунке ниже:
длявыключатель,использовать Как показано на рисунке ниже:
существовать RabbitMQ , сообщения, опубликованные производителем, не попадут в очередь напрямую, а пройдут через обмен (Exchange) Распределено по различным очередям. Как упоминалось ранее, развертывание RabbitMQ После этого по умолчанию есть Семь переключателей, напр. (AMQP default)
、amq.direct
ждать.
конечно,длясейчассуществовать Давайте поговорим,Нам не нужно знать выключатель,так,существоватьВ этом разделе из Учебник,Будет использоваться выключатель по умолчанию, чтобы завершить эксперимент.
существоватьпренебрегатьвыключательжитьсуществоватьизслучай,Мы можем упростить процесс производства и потребления, как показано ниже:
Пожалуйста, обязательно обратите внимание,Не показано на картинкевыключательизжитьсуществовать,потому чтодляиспользоватьиздапо умолчаниюизвыключатель.Дапродюсертолкатьинформациядолжендатолкатьприезжатьвыключатель,и Нетдаочередь,Эту фразу необходимо прояснить.
дляпотребитель Давайте поговорим,хотетьиспользоватьочередь,долженубеждатьсяочередьужечерезжитьсуществовать。
использовать C# Код и параметры объявления (создания) очереди следующие:
// объявить поворот
channel.QueueDeclare(
// очередьимя
queue: "myqueue",
// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
durable: false,
// Поворот удаляется при закрытии соединения
exclusive: false,
// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
autoDelete: false,
// Дополнительная настройка параметров
arguments: null
);
Полный пример кода:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
// соединять
using IConnection connection = factory.CreateConnection();
// ряд
using IModel channel = connection.CreateModel();
channel.QueueDeclare(
// очередьимя
queue: "myqueue",
// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
durable: false,
// Поворот удаляется при закрытии соединения
exclusive: false,
// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
autoDelete: false,
// Дополнительная настройка параметров
arguments: null
);
queue
:очередьизимя。
durable
:настраиватьда, стоит ли упорствовать。Выносливостьизочередьвстречажитьтарелка,существующие могут гарантировать, что соответствующая информация не будет потеряна при перезапуске сервера.exclusive
настраиватьда Никаких эксклюзивов。еслиодиночередьодеялозаявлениедляэксклюзивныйочередь,Поворот виден только в первый раз.,исуществоватьсоединять Автоматически при отключенииудалить.
autoDelete
:настраиватьда Нет автоматическогоудалить.автоматическийудалитьизпомещениеда По меньшей мереиметьодинпотребительсоединятьприезжатьэтоточередь,Изназад Местоиметьиэтоточередьсоединятьизпотребитель Когда оба отключены,Только тогда оно будет автоматически удалено.argurnents
: Установите некоторые другие параметры очереди, такие как очередьинформации время ждать.
ifturn был сохранен и не требует повторного выполнения QueueDeclare()
。Повторные звонки QueueDeclare()
,если параметры одинаковы,Никаких побочных эффектов,Не будет никаких проблем, если информационная информация уже подана.
Однако, если QueueDeclare()
Если параметр, если отличается от существующей конфигурации, может быть сообщено об ошибке.
В общем, для разумной архитектуры и надежности, будет определяться архитектором и т.д. Заранее создайте выключатель, по очереди, и тогда клиент сможет его напрямую использовать. Как правило, его нельзя устанавливать при запуске программы, что приведет к большой неопределенности и побочным эффектам.
Произведено отредактированием информациичасиз Код также очень прост,обозначениехотетьотправлятьприезжать Которыйвыключательилимаршрутизациясередина Прямо сейчас Может。
Обязательно обратите внимание, что когда производитель RabbitMQ отправляет сообщение, оно отправляется на обмен, а не непосредственно в очередь!
channel.BasicPublish(
// использоватьпо умолчаниювыключатель
exchange: string.Empty,
// К какому повороту нажать?
routingKey: "myqueue",// очередьсвойство
basicProperties: null,
// Для отправки изинформации вам нужно сначала конвертировать byte[]
body: Кодировка.UTF8.GetBytes("Тест")
);
BasicPublish
Есть три перегрузки:
BasicPublish(
PublicationAddress addr,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
BasicPublish(string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
BasicPublish(string exchange,
string routingKey,
bool mandatory = false,
IBasicProperties basicProperties = null,
ReadOnlyMemory<byte> body = default)
exchange
: выключательизимя,если Оставьте пустымвстречатолкатьприезжатьпо умолчаниювыключатель.routingKey
: маршрутизацияключ,выключательв соответствии смаршрутизацияключ Воляинформацияжитьмагазинприезжатьсоответствующийизочередьсреди。basicProperties
:информациясвойство,Если истекло времяждать.mandatory
:ценитьдля false час,есливыключательбезиметьобязательностьподходящийизочередь,ноСообщение будет потеряно。ценитьдля true час,есливыключательбезиметьобязательностьподходящийизочередь,новызоветIModel.BasicReturn
событие.IBasicProperties basicProperties
Параметр интерфейса, который мы можем использовать IModel.CreateBasicProperties()
Создайте объект интерфейса.
IBasicProperties
Многие атрибуты инкапсулированы в интерфейс, поэтому нам не нужно отображать конфигурацию передачи в строках.
IBasicProperties
Его полные свойства следующие:
// Нанесение логотипа из ID
public String AppId { set; get; }
// Кластер идентичности из ID
public String ClusterId { set; get; }
// Укажите содержимое информации из метода кодирования, например "utf-8"
public String ContentEncoding { set; get; }
// обозначениеинформациясодержаниеиз MIME тип, например "application/json"
public String ContentType { set; get; }
// Используется для связи информации с отношениями, обычно используется Сценарий RPC (удаленный вызов процедуры)
public String CorrelationId { set; get; }
// Укажите метод сохранения, значение 1: Не постоянно, значение 2: Устойчивое развитие
public Byte DeliveryMode { set; get; }
// Указывает срок действия в миллисекундах
public String Expiration { set; get; }
// Настроить информацию из заголовка
public IDictionary`2 Headers { set; get; }
// Укажите уникальный идентификатор
public String MessageId { set; get; }
// да, стоит ли упорствовать
public Boolean Persistent { set; get; }
// Указывает уровень приоритета в диапазоне от 0 приезжать 9
public Byte Priority { set; get; }
// Предназначен для ответа на информациюизочередное имя
public String ReplyTo { set; get; }
// Предназначен для ответа на информацию об адресе
public PublicationAddress ReplyToAddress { set; get; }
// обозначениеинформацияизвремяштамп
public AmqpTimestamp Timestamp { set; get; }
// тип информации
общественный String Type { set; get; }
// Идентифицировать пользователя из ID
public String UserId { set; get; }
При отправке сообщений IBasicProperties можно настроить для отдельных сообщений:
использование соединения IConnection = Factory.CreateConnection();
используя канал IModel = Connection.CreateModel();
// создаём Два поворота
канал.QueueDeclare(очередь: "q1", durable: false, exclusive: false, autoDelete: false);
var properties = channel.CreateBasicProperties();
// Пример 1:
properties.Persistent = true;
properties.ContentType = "application/json";
properties.ContentEncoding = "UTF-8";
// Пример 2:
//properties.Persistent = true;
//properties.ContentEncoding = "gzip";
//properties.Headers = new Dictionary<string, object>();
channel.BasicPublish(
exchange: string.Empty,
routingKey: "q1",
basicProperties: properties,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
для IBasicProperties изиспользовать, более подробное описание будет приведено далее в статье.
Сейчас существуют, мы толкнули это 10 полоскаинформацияприезжатьочередьсередина,Затемсуществовать Management UI Наблюдать.
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
Мы можем существовать UI из Queues and Streams серединасмотретьприезжатькогдавперед Местоиметьизочередь。
Можно смотретьприезжатькогдавпередочередьсерединаиз
Ready
состояниеUnacked
состояниеизинформациячисло,соответственнопереписыватьсявышесерединаиз Ожидание доставки впотребительизинформациячислоибыл доставлен впотребитель Даеще нетполучатьприезжатьсигнал подтвержденияизинформациячисло
После нажатия на кнопку «Открыть» появится интерфейс «Открытие», как показано ниже.
Сначала посмотрите Обзор.
«Готово» означает количество, которое еще не было израсходовано.
Unacked относится к потреблению, но не ack изинформациячислоколичество。
другой Message rates диаграмма,Относится к изданию публикации, информации о скорости потребления.,Потому что для не важно,Поэтому здесь это не объясняется.
существовать Bindings середина,Можно смотретьприезжать Долженочередьобязательность Понятнопо умолчаниюизвыключатель.
Затем напишите потребитель,Потребляйте очередьсерединаизинформация,Что Полный код выглядит следующим образом:
использование RabbitMQ.Client;
использование RabbitMQ.Client.Events;
использование System.Text;
Фабрика ConnectionFactory = новая Фабрика ConnectionFactory
{
Имя хоста = «локальный хост»
};
использование соединения IConnection = Factory.CreateConnection();
используя канал IModel = Connection.CreateModel();
канал.QueueDeclare(
// поворот имя
очередь: "myqueue",
// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
durable: false,
// Поворот удаляется при закрытии соединения
exclusive: false,
// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
autoDelete: false,
// Дополнительная настройка параметров
arguments: null
);
// определениепотребитель
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
};
// Начните потреблять
channel.BasicConsume(queue: "myqueue",
autoAck: true,
consumer: consumer);
Console.ReadLine();
Обратите внимание: если вводится несуществующая изочередь, программа сообщит об исключении.
существоватьпотребитель до выхода из программы, то есть IConnection был Dispose()
Извперед,Можетсуществовать Consumers Информация о потребительской клиентской программе представлена в формате .
А что, если мы будем только потреблять и не настраивать автоматическое подтверждение?
Измените потребительский код на:
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Полный код выглядит следующим образом:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.QueueDeclare(
queue: "myqueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
// определениепотребитель
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
};
// Начните потреблять
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Console.ReadLine();
В этот момент вы найдете,Вся информация прочитана,Да Unacked для 10。
Как показано ниже,autoAck: false
После этого, если вы перезапустите программу (только потреблять, не отправлять сообщения), то программа продолжит потреблять снова.
дляеще нет ack изинформация,потребительсновасоединятьназад,RabbitMQ Нажму еще раз.
и Kafka Разное изда, Кафка если не ack когда информация, но сервер автоматически повторно отправит информацию о статье потребителю, если эта информация о статье не заполнена, но всегда будет блокировать существование здесь. идля RabbitMQ,была подтверждена информация будет временно проигнорирована,автоматический Потребление下одинполоска。так На основании этого,По умолчанию RabbitMQ также не может гарантировать порядок информации.。
конечно, RabbitMQ очень гибкийиз,Мы можем выборочно потреблять детали.,Избегайте блокировки перед когдаинформация, из-за чего программа не сможет работать дальше:
// определениепотребитель
int i = 0;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
i++;
// Убедитесь, что информация используется правильно
if (i % 2 == 0)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// Начните потреблять
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
существовать в некоторых ситуациях,Эта функция очень полезна,Мы можем воля множественные неудачи при выполнении изинформация сначала отложим это в сторону,Переход к следующей статье,Избегайте накопления информации.
если одна и та же очередьиз Разные клиенты привязать квыключательсередина,Несколько потребителей работают вместе и говорят,Так что же происходит?
для Первый тип ситуации,RabbitMQ Сообщения будут распределяться поровну каждому клиенту.
Условие выполняется избазиса да, два потребителя различны изпотребителя, если они существуют. ть Одна и та же программа участвует в разных экземплярах потребления, но да, потому что для признается одним и тем же потребителем, и правило но недействительно.
Однако RabbitMQ и Нетвстречасмотретьеще нетподтверждатьизинформациячислоколичество,это Толькодаслепо Воля Нет. n сообщение отправлено на n индивидуальныйпотребитель。
кроме тогосуществоватьобозначениевыключательимяизслучай,Мы можем воля маршрутизировать Набор ключей для пустых,Размещая таким образом изинформацию будет пересылать выключательприезжать соответствующий изв. очереди.
channel.BasicPublish(
exchange: "logs",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
и очередность соответствует одному выключателю. Ситуация сложнее.,назадлапшаиз Главы будут упомянутыприезжать。
продюсерипотребитель Все могутиспользовать QueueDeclare()
Приходитьобъявить поворот。так называемыйиззаявление,Собственно запрос на создание очереди на RabbitMQ Broker,Так что кто бы ни пришел к создателю, он тот же самый.
изаявлениеочередь Связанныйиз,Есть еще две функции:
// Независимо от того, потерпит неудачу создатель или нет, игнорируйте это.
channel.QueueDeclareNoWait();
// Определите, содержит ли очередь существование. Если нет, появится исключение. Если существует, ничего не произойдет.
channel.QueueDeclarePassive();
Дополнительно мы можем удалить очередь:
// ifUnused: очередьбезиметьодеялоиспользоватьчас
// ifEmpty: Когда нет накопления изинформации в свою очередь
channel.QueueDelete(queue: "aaa", ifUnused: true, ifEmpty: true);
Производители могут отправлять сообщения только на биржи, а не в очереди.
При отправке сообщения вы можете указать имя обмена и ключ маршрутизации.
Как показано в следующем коде:
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
ExchangeType Несколько типов типов определены в от переключателя.
public static class ExchangeType
{
public const string Direct = "direct";
public const string Fanout = "fanout";
public const string Headers = "headers";
public const string Topic = "topic";
private static readonly string[] s_all = {Fanout, Direct, Topic, Headers};
}
существоватьиспользоватьодинвыключатель Извперед,нуждаться Первыйзаявлениеодинвыключатель:
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
есливыключательужежитьсуществовать,Повторное выполнение кода запроса,Толькохотеть Конфигурацияисейчасжитьизвыключатель Конфигурация Распределение,но RabbitMQ Ничего не делайте и побочных эффектов не будет.
Да,Не может быть разных конфигураций,Напримеружежитьсуществоватьизвыключательда Fanout
Введите, но повторно выполните оператор кода, в свою очередьдля Direct
тип.
Функция ExchangeDeclare определяется следующим образом:
ExchangeDeclare(string exchange,
string type,
bool durable = false,
bool autoDelete = false,
IDictionary<string, object> arguments = null)
exchange
: выключи.type
переключиться с тип, Такие как разветвление, директ, тема.durable
: Установите, является ли да постоянным durab ,еслиценитьдля правда, он не будет потерян даже после перезапуска сервера.autoDelete
:настраиватьда Нет автоматическогоудалить.argument
:Что Он несколько структурированпараметр.конечно,выключательтакже Можетодеялоудалить.
// ifUnused Он будет удален только в случае существования очередностибылиспользоватьиз.
channel.ExchangeDelete(exchange: "log", ifUnused: true);
Есть еще один NotWait
метод.
channel.ExchangeDeclareNoWait("logs", ExchangeType.Direct);
//channel.ExchangeDeclareNoWait(...);
Даже если при переобъявлении и удалении обменника возникла проблема, поскольку он возвращает void, в случае неудачной операции не будет сообщено об исключении.
Существует также способ определить, существует ли переключатель.
channel.ExchangeDeclarePassive("logs")
После создания нескольких очередей также необходимо привязать очереди к обменнику.
Как показано в следующем коде,Его выключатель связывает два поворота,продюсертолкатьинформацияприезжатьвыключательчас,Два поворота Всевстречаполучатьприезжатьтакой жеизинформация。
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// создаватьвыключатель
channel.ExchangeDeclare("logs",ExchangeType.Fanout);
// создавать Два поворота
channel.QueueDeclare(
queue: "myqueue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.QueueDeclare(
queue: "myqueue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.QueueBind(queue: "myqueue1", exchange: "logs", routingKey: string.Empty);
channel.QueueBind(queue: "myqueue2", exchange: "logs", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "logs",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
После отправки сообщения каждая связанная logs выключательизочередь Всевстречаполучатьприезжатьтакой жеизинформация。
Обратите внимание: поскольку биржа не хранит сообщения, создайте еще одну myqueue3 изинформацияочередьпривязка logs переключатель, myqueue3 Тольковстречаловитьполучатьприезжатьобязательность Изназадтолкатьизинформация,Нет Могу получитьприезжать更早Извпередизинформация。
Выключатели бывают следующих типов:
direct основан на routingKey Воляинформациятолкатьприезжать Неттакой жеизв очереди.
Сначала создайте несколько очередей.
// создавать Два поворота
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");
Затем при привязке очереди к коммутатору для отношения привязки необходимо установить RouteKey.
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");
Наконец, при отправке сообщения вам необходимо указать имя обмена и ключ маршрутизации.
// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
Когда сообщение отправляется на logs переключатель, переключатель будет основан на routingKey Воляинформация Впередприезжатьпереписыватьсяизв очереди.
весьиз Пример кода выглядит следующим образом:
// создаватьвыключатель
channel.ExchangeDeclare("logs",ExchangeType.Direct);
// создавать Два поворота
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");
// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
После запуска выяснилось, что только direct1 очередь Можетполучатьприезжатьинформация,потому чтодляэтотоснован использовать при связывании routingKey=debug
Решите из.
Толькохотетьочередьобязательность Понятновыключатель,нокаждыйвыключатель Всевстречаполучатьприезжать Такой жеизинформация,Fanout проигнорирую routingKey。
Как показано в следующем коде:
// создаватьвыключатель
channel.ExchangeDeclare("logs1",ExchangeType.Fanout);
// создавать Два поворота
channel.QueueDeclare(queue: "fanout1");
channel.QueueDeclare(queue: "fanout2");
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs1", queue: "fanout1", routingKey: "debug");
channel.QueueBind(exchange: "logs1", queue: "fanout2", routingKey: "info");
// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs1",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
Topic будет основано на routingKey Найдите совпадения, соответствующие критериям изочередь, в свою очередь можно использовать .
、#
、*
Для выделения используются три вида символов. Тема Правила зонирования относительно гибкие.
существоватьсоздаватьочередь Изназад,обязательностьвыключательчас,routingKey
использоватьвыражение。
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");
При отправке сообщения RouteKey необходимо указать полное имя.
// отправка информации
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.green",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
Во-первых, ключ маршрутизации будет основано на .
Символы разделены.
например red.yellow.green
будет разбит на [red,yellow,green]
Три части.
Если вы хотите смутно отличить часть, вы можете использовать *
。например red.*.green
, может быть присвоен red.aaa.green
、red.666.green
。
*
Можетсуществоватьлюбая частьиспользовать,например *.yellow.*
、*.*.green
。
#
Можно выделить несколько частей, например red.#
Может быть назначен на red.a
、red.a.a
、red.a.a.a
。
весьиз Пример кода выглядит следующим образом:
// создаватьвыключатель
channel.ExchangeDeclare("logs3",ExchangeType.Topic);
// создавать Два поворота
channel.QueueDeclare(queue: "topic1");
channel.QueueDeclare(queue: "topic2");
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");
// отправка информации
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.green",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.yellow.green",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
Два сообщения были отправлены на logs
переключатель, где routingKey=red.green
изинформация, будь red.#
зонированы и, следовательно, будут перенаправлены на topic1 в очереди.
и routingKey=red.yellow.green
изинформацию можно разделить на два направления, так topic1 и topic 2 можно получить.
Помимо привязки очередей, переключатели также можно привязывать к переключателям.
Пример:
Воля b2 привязать к b1 средний, b2 можно получить b1 изинформация。
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);
После привязки нажмите b1 выключательизинформация,встречаодеяло Впередприезжать b2 выключатель.
Полный пример кода выглядит следующим образом:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Fanout);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
// Потому что для обоих да ExchangeType.Fanout,
// так routingKey использовать string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);
// создаватьочередь
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "b1",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
конечно,Может Волявыключатель、очередьтакой жечаспривязать к b1 в переключателе.
Кроме того, два типа выключателей могут быть разными. Однако это немного усложнит правила распределения.
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
нас Можетпонимается каксуществоватьвыключательобязательностьчас,b2 Взаимнодляодиночередь。когда b1 установлен на Direct При использовании переключателя также необходимо указать при привязке переключателя routingKey。
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");
и b2 выключательи q2 Очередь, все еще Fanout отношения не затронуты.
Это значит, b1, b2 даа отношения,этоихиз Отображение отношений Нетвстреча Влияниеприезжатьдругие люди,также Нетвстреча Влияниеприезжатьследующий уровень。
Полный пример кода выглядит следующим образом:
using RabbitMQ.Client;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
// Потому что для обоих да ExchangeType.Fanout,
// так routingKey использовать string.Empty
channel.ExchangeBind(destination: "b2", source: "b1",routingKey: "demo");
// создавать Два поворота
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "b1",
routingKey: "demo",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
потребитель BasicConsume
Функция определяется следующим образом:
BasicConsume(string queue,
bool autoAck,
string consumerTag,
IDictionary<string, object> arguments,
IBasicConsumer consumer)
В разных подписках на потребление используются разные теги потребления. (consumerTag) отличить одно от другого ,существоватьтакой жеодинряд(IModel)серединаизпотребитель Его необходимо отличать тегом потребителя, который не обязательно устанавливать по умолчанию.
queue
:очередьизимя。autoAck
:настраиватьда Нет автоматическогоподтверждать。consumerTag
: потребитель Этикетка,Используется для различения нескольких товаров.arguments
:настраиватьпотребительиз Чтоонпараметр.Впереди у нас есть EventingBasicConsumer создавать IBasicConsumer Интерфейс-потребитель программы, среди которых EventingBasicConsumer Содержит следующие события:
public event EventHandler<BasicDeliverEventArgs> Received;
public event EventHandler<ConsumerEventArgs> Registered;
public event EventHandler<ShutdownEventArgs> Shutdown;
public event EventHandler<ConsumerEventArgs> Unregistered;
этотнекоторыйсобытиевстречасуществоватьинформацияиметь дело сиз Неттакой же阶段одеялокурок。
Потребительская программа имеет два режима потребления: выталкивающий и вытягивающий.,Передний Упомянулприезжатьизкод Вседарежим нажатия,При появлении новой информации,RabbitMQ встречаавтоматическийтолкатьприезжатьпотребительпрограммасередина。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// Начните потреблять
channel.BasicConsume(queue: "myqueue5",
autoAck: false,
consumer: consumer,
consumerTag: "demo");
еслииспользоватьрежим вытягивания(BasicGet()
письмочисло),Таксуществовать RabbitMQ Broker Если в изочереди нет информации, она будет возвращена null。
// Начните потреблять
while (true)
{
var result = channel.BasicGet(queue: "q1", autoAck: false);
// если нетянутьприезжатьинформациячас
if (result == null)
{
// Избегайте бесконечных запросов, когда нет информации
Thread.Sleep(100);
continue;
}
Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
channel.BasicAck(deliveryTag: result.DeliveryTag, multiple: false);
}
когдаиспользовать BasicGet()
При ручном извлечении информации программа не сохраняет программу для потребителя, что означает RabbitMQ из Consumer Невозможно увидеть в .
Как в режиме push, так и в режиме pull при подтверждении сообщения используется несколько параметров.
multiple
настраиватьдля false
,но Толькоподтверждатьобозначение deliveryTag
изодинполоскаинформация。multiple
настраиватьдля true
,новстречаподтверждать Местоиметь Сравниватьобозначение deliveryTag
Небольшая из и была подтверждена информация.информацияиз deliveryTag Свойства ulong Тип, представляющий смещение информации, от
1....
Начни считать.
существуют При массовом получении информации и ее обработке используйте multiple
Приходитьподтверждатьодин组информация,Нет необходимости подтверждать по пунктам,Это повышает эффективность.
Потребительская программа может установить Qos.
channel.BasicQos(prefetchSize: 10, prefetchCount: 10, global: false);
prefetchSize
:этотженьшеньчисловыражатьпотребитель Местоспособныйловитьполучатьеще нетподтверждатьинформацияизобщий размеризверхний предел,настраиватьдля 0 Это означает, что верхнего предела нет.
prefetchCount
: изметод Приходитьнастраиватьпотребитель客户端最大способныйловитьполучатьизеще нетподтверждатьизинформациячисло。этот Конфигурацияираздвижное окночислоколичество意思差Нетмного。
глобальный немного особенный.
Когда global for имеет значение false, только новый потребитель должен соответствовать правилам.
в случае global для true время, то же самое IConnection серединаизпотребительвсевстречаодеяло Исправлять Конфигурация。
// не затронуто
// var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
// Затронут новый потребитель
// var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);
когдаполучатьприезжатьинформациячас,если необходимо явно отклонить информацию,Можетиспользовать BasicReject
,RabbitMQ Будет ли воля информация удалена из очереди.
BasicReject()
Вызовет мертвую почту сообщения.
while (true)
{
var result = channel.BasicGet(queue: "q1", autoAck: false);
if (result == null) continue;
Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
channel.BasicReject(deliveryTag: result.DeliveryTag, requeue: true);
}
если requeue Настройки параметровдля true ,но RabbitMQ встречаснова Воляэтотполоскаинформацияжитьвходитьочередь,Чтобы его можно было отправить на следующую подписку изпотребителю.,Другими словами, программу можно будет получить снова после перезапуска.
если requeue Настройки параметровдля false ,но RabbitMQ немедленно удалит информацию из очереди и не отправит ее новому потребителю.
если хотите отклонить информацию оптом.
channel.BasicNack(deliveryTag: result.DeliveryTag, multiple: true, requeue: true);
multiple для true Когда, но выражает отказ deliveryTag Номер до того, как бывший потребитель подтверждает изинформацию.
BasicRecover()
метод, используемый для начала с RabbitMQ Повторное получение также подтвердило информацию
когда requeue=true
час,былподтверждатьизинформациявстречаодеялоснова加входитьприезжатьочередьсередина,для того же, что и информация,Он будет закреплен за другим потребителем.
когда requeue=false
,Та же информация будет присвоена тому же потребителю, что и раньше.
channel.BasicRecover(requeue: true);
// асинхронный
channel.BasicRecoverAsync(requeue: true);
Переднийнестиприезжать,когда autoAck=false
В то время, хотя новостей не было ок, но RabbitMQ Он все равно перейдет к следующему сообщению.
для гарантии порядка информацияиз,существоватьеще нет Волякогдавперединформация Расход завершенизслучай,Автоматическое потребление следующей информации не допускается.
Тольконуждатьсяиспользовать BasicQos
Просто настройте:
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// создаватьвыключатель
channel.ExchangeDeclare("acktest",ExchangeType.Fanout);
// создавать Два поворота
channel.QueueDeclare(queue: "myqueue5");
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "acktest", queue: "myqueue5", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
// отправка информации
channel.BasicPublish(
exchange: "acktest",
routingKey: string.Empty,
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
i++;
}
// еще нет ack Раньше, не могу съесть следующий
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
// channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// Начните потреблять
channel.BasicConsume(queue: "myqueue5",
autoAck: false,
consumer: consumer);
После предыдущего кода,ты найдешь,Нет.одинполоскаинформациябыл ack , программа не будет автоматически читать следующую информацию и не будет повторно получать ack изинформация。
если Мы хотим прочитать было еще раз ack изинформация, вы можете перезапустить программу или использовать BasicRecover()
Позвольте серверу снова нажать.
Ранее упоминалась функция BasicPublish в определении:
BasicPublish(string exchange,
string routingKey,
bool mandatory = false,
IBasicProperties basicProperties = null,
ReadOnlyMemory<byte> body = default)
когданастраивать mandatory = true
час,есливыключательнеспособен полагаться на себяизтипимаршрутизацияключпопытаться найтиприезжатьодинсоответствоватьполоскакускиизочередь,Так RabbitMQ Триггерный клиент из IModel.BasicReturn
событие, Воляинформация возвращается производителю 。
С точки зрения дизайна, IConnection Хотя их может быть больше одного IМодель (канал),Но да рекомендует писать только потребительскую программу или программу-производитель.,Не рекомендуется для смешанного использования.
потому чтодля Все видысобытиеиочередь Конфигурация,дапротиводин IМодель (канал) установить из.
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.BasicReturn += (object sender, BasicReturnEventArgs e) =>
{
};
когда установлено mandatory = true
час,если Долженинформацияпопытаться найти Нетприезжатьочередьжитьмагазининформация,Так Сразувстреча Триггерный клиент из BasicReturn Прием мероприятий BasicPublish Отказ изинформации.
Полный пример кода выглядит следующим образом:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Runtime;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "e2", type: ExchangeType.Fanout, durable: false, autoDelete: false);
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
Console.WriteLine($"неверныйинформация:{Encoding.UTF8.GetString(e.Body.Span)}");
};
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e2",
routingKey: string.Empty,
// обязательный=true, он будет срабатывать, когда не будет получена информация BasicReturn событие
mandatory: true,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
Console.ReadLine();
существоватьдействительныйразвиватьсередина,когда mandatory=false
час,еслиодинполоскаинформациятолкатьприезжатьвыключатель,Но да не связан поворот,Тогда информация о статье будет потеряна.,Это может привести к серьезным последствиям.
исуществовать RabbitMQ середина,нести供ПонятноодиндобрыйодеялосказатьдляЗапасное колесовыключательизплан,этотдапроходитьсуществоватьопределениевыключательчасдобавить в alternate-exchange
Параметры, которых нужно достичь. Его функциядакогда A выключатель Не могу найтиприезжатьочередь Вперединформациячас,Сразувстреча Воляинформация Впередприезжать B в очереди.
Полный пример кода выглядит следующим образом:
первыйсоздавать e3_bak очередь,затемсоздавать e3 в свою очередь при установке запасного колеса переключательдля e3_bak。
Затем e3_bak необходимо связать очередь для приема сообщений.
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e3_bak",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
// заявление e3 выключатель,когда e3 выключательбезиметьобязательностьочередьчас,информация Волявстречаодеяло Впередприезжать e3_bak выключатель
channel.ExchangeDeclare(
exchange: "e3",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false,
arguments: new Dictionary<string, object> {
{ "alternate-exchange", "e3_bak" }
}
);
channel.QueueDeclare(queue: "q3", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q3", "e3_bak", routingKey: string.Empty);
// Потому что для уже установлено e3 иззапаснойвыключатель,так Нетвызовет BasicReturn
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
Console.WriteLine($"неверныйинформация:{Encoding.UTF8.GetString(e.Body.Span)}");
};
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e3",
routingKey: string.Empty,
// Потому что для уже установлено e3 иззапаснойвыключатель,таквключатьэтот Нетвызовет BasicReturn
mandatory: true,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
Console.ReadLine();
Уведомление,если выключатель запасного колеса правильно ли он закреплен, поверните из слов,Тогда информация будет потеряна.
если e3 да Direct,e3_bak или Директ, то у обоих должно быть одинаковое из routingKey,если e3 Есть один в routingKey = cat
,Да e3_bak существования не существует в соответствующем из routingKey,Тогда информация или да будут потеряны. Есть и другие ситуации,Я не буду здесь вдаваться в подробности.
При отправке сообщения появляется IBasicProperties basicProperties
Атрибуты, атрибуты интерфейса были представлены в предыдущем разделе, когда IBasicProperties.DeliveryMode=2
информация Воля отмечена как постоянная, даже если RabbitMQ Даже если сервер будет перезапущен, сообщения не потеряются.
условно говоря,Благодаря предыдущему эксперименту,ты Можетнаблюдатьприезжатьклиенточередьизинформация Все Потребление完毕назад,очередьсерединаизинформация Всевстречапропадать。ипереписываться Kafka Например, topic Если среда израсходована, она все равно сохранится. Это следует принять к сведению, используйте RabbitMQ час,Персистентность необходимо настроить заранее.,Избегайте потребления или еще нетуспех Потреблениечас,информацияпотерянный。
Когда производитель существования выдвигает информацию, вы можете использовать IBasicProperties.DeliveryMode=2
Воля Долженинформациянастраиватьдля Выносливость。
var ps = channel.CreateBasicProperties();
ps.DeliveryMode = 2;
channel.BasicPublish(
exchange: "e3",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
настраивать Время жизни сообщенияназад,Информацияеслисуществовать не должна быть использована в течение определенного времени.,Тогда информация становится мертвой информацией. для такого рода информации,Будет примерно две ситуации обработки.
Установлен первый, еслиочередь "x-dead-letter-exchange"
,Так Долженинформациявстречаодеялооточередь Впередприезжатьдругойвыключательсередина。этотдобрыйметодсуществоватьмертвое письмовыключательодин节серединавстречапредставлять。
Во втором случае сообщение отбрасывается.
На данный момент существует два способа Установить сообщение из TTL.
Первый типметоддапроходитьнастройки атрибутов Turn,Таким образом, вся информация, в свою очередь, имеет одинаковое время истечения срока действия.
Второй способ — настроить одну строку индивидуально, и для каждой строки будет TTL. Может быть разным.
если обе настройки вместеиспользуйте,ноинформацияиз TTL кКакое из двух значений меньше, является более точным?。информациясуществоватьочередьсерединаизрожденныйжитьчасодин旦超过настраивать TTL ценитьчас,потребитель Воля Больше не могуполучатьприезжать Долженинформация,такбольшинствонастраиватьмертвое письмовыключатель.
Первый — установить очередь:
channel.QueueDeclare(queue: "q4",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>() { { "x-message-ttl", 6000 } });
Второй способ — настроить срок действия сообщения, задав свойства.
var ps = channel.CreateBasicProperties();
// Единица миллисекунда
ps.Expiration = "6000";
для Первый типнастраиватьочередьсвойствоизметод,По истечении срока действия информация будет удалена в свою очередь.(еслинастраивать Понятномертвое письмовыключатель,встречаодеяло Впередприезжатьмертвое письмовыключательсередина)。исуществовать Нет.二добрыйметодсередина,Даже если срок действия информации истек,И при этом он не будет стерт немедленно из своей очереди.,потому чтодля Долженполоскаинформациясуществовать Прямо сейчас Волядоставкаприезжатьпотребитель Извперед,Только тогда будет проверена, истек ли срок действия информации. для Вторая ситуация,когдаочередь при выполнении любой операции опроса,будет фактически удалено.
для Вторая ситуация,Пока идет опрос дасуществовать,Оно будет удалено только после истечения срока его действия.,Но как только срок действия истекает,Сразувстречаодеяло Впередприезжатьмертвое письмоочередьсередина,Только да сразу не удалят.
когда по сравнению с установкой поворота TTL Если очередьсуществовать предусматривает, что время не потребляется функцией use, тогда очередь будет удалена. Это ограничение включает в себя то, что время не потребляется информацией (включая ее). BasicGet()
способ потребленияиз)、безиметьодеялосновазаявление、безиметьпотребительсоединять,В противном случае удаленное время обратного отсчета будет сброшено.
channel.QueueDeclare(queue: "q6",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
// Единица миллисекунды, набор очередь Истекшийвремяда 1 Час
{"x-expires",1*3600*1000}
});
DLX(Dead-Letter-Exchange) мертвое письмовыключатель,Став мертвой буквой в A,этоспособныйодеялосноваодеялоотправлятьприезжатьдругой B в переключателе.в A очередьобязательность Понятномертвое письмовыключатель,ТаксуществоватьManagement UI Интерфейс увидит DLX логотип А переключатель Б — обычный переключатель,Никакой настройки не требуется.
Сообщение становится мертвым письмом в следующих ситуациях:
BasicReject()
、BasicNack()
Две функции могут отклонять сообщения.когда это место A серединажитьスペータストмертвое письмоинформациячас,RabbitMQ Сразувстречаавтоматическийземля Воляэтотинформациясновавыпускатьприезжатьнастраиватьизвыключатель B середина. Обычно мертвое письмо создается специально для важного изочередного выключателя. B,ивыключатель B Вам также необходимо привязать очередь C Только тогда, иначе сообщение потеряется.
Настройка поворота При появлении недоставленного сообщения информация,Воляинформация Впередприезжать Которыйвыключательсередина:
channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e7_bak" } });
Полный пример кода выглядит следующим образом:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e7_bak",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q7_bak", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q7_bak", "e7_bak", routingKey: string.Empty);
channel.ExchangeDeclare(
exchange: "e7",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e7_bak" } });
channel.QueueBind(queue: "q7", "e7", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e7",
routingKey: string.Empty,
mandatory: false,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"Test{i}"));
i++;
}
Thread.Sleep(1000);
int y = 0;
// определениепотребитель
channel.BasicQos(0, prefetchCount: 1, true);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
if (y % 2 == 0)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
// requeue Чтобы установить для false Вот и все,
// В противном случае эта информация будет возвращена в очередь после отклонения.
else
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
Interlocked.Add(ref y, 1);
};
// Начните потреблять
channel.BasicConsume(queue: "q7",
autoAck: false,
consumer: consumer);
Console.ReadLine();
RabbitMQ Прямой поддержкиочередности как таковой нет. задержкииз Функция。
Так что же будет с для?
После главного да в информационный толчок,Нежелание быть съеденным немедленно. например говорит,После того, как пользователь разместил заказ,если нет оплаты в течение 10 минут,Так Должен Заказвстречаодеялоавтоматический Отмена。такнуждаться Делатьодининформацияодеяло延迟Потреблениеиз Функция。
такобъяснять,Фактический спросда,Информация должна существовать до того, как она сможет быть использована потребителем.。
Чтобы выполнить эту функцию в RabbitMQ, вам понадобится два выключателя и как минимум два выключателя.
Идеидаопределениедвавыключатель e8、e9 и Два поворота q8, q9, переключатель e8 иочередь q8 Привязка, переключатель e9 и q9 Привязка.
Вот и самый важный момент, q9 поставил мертвую букву очередь,когда Время жизни сообщенияприезжатьчас,Впередприезжать e9 в переключателе.так,e9 выключатель - q9 очередь ловитьполучатьприезжатьиз Вседаприезжать Ожидать(или者объяснять Истекший)изинформация。
существоватьотправка информацииприезжать e8 выключательчас,настраивать TTL время。когда q8 очередьсерединаизинформация Истекшийчас,информациявстречаодеяло Впередприезжать e9 выключатель,Затем внесите депозит q9.
потребитель Просто нужно подписаться q9 очередь,Прямо сейчас Может Потреблениеприезжать Ожидатьназадизинформация。
все Полный пример кода выглядит следующим образом:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e8",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.ExchangeDeclare(
exchange: "e9",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q9", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q9", "e9", routingKey: string.Empty);
channel.QueueDeclare(queue: "q8", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e9" } });
channel.QueueBind(queue: "q8", "e8", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
var ps = channel.CreateBasicProperties();
ps.Expiration = "6000";
channel.BasicPublish(
exchange: "e8",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
i++;
}
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] ужеприезжать Ожидатьинформация {message}");
};
// Начните потреблять
channel.BasicConsume(queue: "q9",
autoAck: true,
consumer: consumer);
Console.ReadLine();
Чем выше Приоритет сообщения, тем быстрее оно будет израсходовано потребителем.
Пример кода выглядит следующим образом:
var ps = channel.CreateBasicProperties();
// приоритет 0-9
ps.Priority = 9;
channel.BasicPublish(
exchange: "e8",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"Test{i}")
);
Итак, RabbitMQ Последовательность информации не обязательно может быть гарантирована, что аналогично Kafka Есть разница между да и из.
механизм транзакциида,Издатель подтвердилинформацияодин定толкатьприезжать RabbitMQ Broker , часто вместе с бизнес-кодом.
напримеробъяснять,После того, как пользователь успешно оплатит,толкатьодинуведомитьприезжать RabbitMQ в очереди.
База данных, когда, естественно, должна выполнять транзакции,Таким образом, измененные данные будут отменены после сбоя платежа. Но вот проблема,если информация была отправлена,Дачисло据库却раз滚Понятно。
этотчасждатьвстречас участиемприезжатьпоследовательность,Можетиспользовать RabbitMQ измеханизм транзакции Приходитьиметь дело с,Идея аналогична процессу транзакции базы данных.,Также существуют операции фиксации и отката.
Чтоглазиздаубеждатьсяинформацияуспехтолкатьприезжать RabbitMQ Broker Помимо обеспечения согласованности данных с другими кодами на клиенте, push-сообщения и операции кода выполняются одновременно или одновременно откатываются.
Однако RabbitMQ Транзакции и транзакции базы данных — это две вещи. когда один из них RabbitMQ Когда транзакция завершена, но программа зависла, происходит откат другой транзакции базы данных. На этом этапе данные снова будут противоречивыми. потому чтоэтот,Также необходимо обеспечить возможность завершения двух этапов одновременно. в это время,нас Можетспособный又нуждатьсяпредставлятьвторой этапнестиплатить、поддерживатькомпенсационный механизмждать.этотразприезжать Поле распределенной транзакции。
Это полное из Пример кода выглядит следующим образом:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// Клиент отправляет Канал Tx.Select.Воля установлен в режим транзакций;
channel.TxSelect();
try
{
// отправка информации
channel.QueueDeclare(queue: "transaction_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "transaction_queue",
basicProperties: null,
body: body);
// выполнить ряд действий
// совершить транзакцию
channel.TxCommit();
Console.WriteLine(" [x] Sent '{0}'", message);
}
catch (Exception e)
{
// Откат транзакции
channel.TxRollback();
Console.WriteLine("An error occurred: " + e.Message);
}
Console.ReadLine();
Механизм подтверждения отправителя,дагарантироватьинформацияодин定толкатьприезжать RabbitMQ из схемы.
имеханизм транзакции,Обычно дадля обеспечения последовательности,Нажмите информацию, и другие операции будут успешными или неудачными одновременно,Между ними не может быть никакого противоречия.
Что Полный пример кода выглядит следующим образом:
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// Включить режим подтверждения отправителя
channel.ConfirmSelect();
string exchangeName = "exchange_name";
string routingKey = "routing_key";
// определениевыключатель
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
// отправка информации
string message = "Hello,RabbitMQ!";
вар тело = Encoding.UTF8.GetBytes(сообщение);
// выпускатьинформация
канал.BasicPublish(обмен: ExchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);
// ждатьподтверждатьужетолкатьприезжать RabbitMQ
if (channel.WaitForConfirms())
{
Console.WriteLine(" [x] Sent '{0}'", message);
}
else
{
Console.WriteLine("Message delivery failed.");
}
Console.ReadLine();
Статья написана к этому моменту, ровно 10 000 слов.
для RabbitMQ Кластеризация, эксплуатация и обслуживание и другие технологии не будут подробно описаны в этой статье.