Длинная статья из 10 000 слов: Изучите программирование очередей сообщений RabbitMQ на C#.
Длинная статья из 10 000 слов: Изучите программирование очередей сообщений RabbitMQ на C#.

Учебное пособие по RabbitMQ

Эта статья была перенесена на github :https://github.com/whuanle/learnrabbitmq Если макет статьи неудобен для чтения, вы можете скачать оригинальную версию со склада. markdown Чтение файлов.

Введение в RabbitMQ

RabbitMQ — это очередь сообщений, реализующая протокол AMQP, который определен как открытый стандартный протокол прикладного уровня в качестве промежуточного программного обеспечения для обмена сообщениями. Он означает расширенный протокол очереди сообщений и имеет такие функции, как позиционирование сообщений, маршрутизация, организация очередей, безопасность и надежность.

В настоящее время наиболее популярные очереди сообщений в сообществе включают kafka, ActiveMQ, Pulsar, RabbitMQ, RocketMQ и т. д.

Автор также написал серия Kafka Учебник,Добро пожаловать для чтения:https://kafka.whuanle.cn/

Преимуществами и возможностями использования RabbitMQ, вероятно, являются высокая надежность, гибкая настройка правил маршрутизации, поддержка распределенного развертывания, соответствие протоколу AMQP и т. д. Его можно использовать для асинхронной связи, сбора журналов (Kafka лучше подходит для сбора журналов), системы архитектуры, управляемой событиями, развязки связи приложений и т. д.

Особенности версии сообщества RabbitMQ следующие:

  • поддерживаются несколько протоколов доставки информации, очередность информации, подтверждение доставки, гибкая изочередная маршрутизация, несколько типов обмена (выключатель).
  • поддерживать Kubernetes и т. д. Распределенное развертывание, обеспечивающее несколько языков из SDK, например Java、Go、C#。
  • Подключаемая аутентификация, авторизация, поддержка TLS и LDAP。
  • поддерживают непрерывную интеграцию, операционные показатели и другую интеграцию корпоративных систем, а также различные инструменты и плагины.
  • Предоставляет пакет для управления и мониторинга. RabbitMQ из HTTP-API, инструменты командной строки UI。

Базовые объекты RabbitMQ имеют следующие моменты, но читателям сейчас не нужно их запоминать. В следующих главах автор представит их один за другим.

  • Продюсер:толкатьинформацияприезжать RabbitMQ из программы.
  • потребитель(Consumer):от RabbitMQ Потреблениеинформацияиз программы.
  • очередь(Queue):RabbitMQ Храните информацию о местах, информацию о товаре можно получить из очереди.
  • выключатель(Exchange):Получить от производителяизинформация,И маршрут Воляинформация приезжает в один или несколько г. очереди.
  • Связывание:Воляочередьивыключательсвязанный,Когда производитель публикует информацию,выключатель Воляинформациямаршрутизацияприезжатьв очереди.
  • Ключ маршрутизации:используется длявыключатель Воляинформациямаршрутизацияприезжатьидентификацияочередьиз Правила сопоставления。

Технические знания RabbitMQ грубо делятся на:

  • Пользователь и разрешения: Настройка пользователя、Роли и соответствующие им разрешения.
  • Виртуальные хосты: настройте виртуальные хосты для разделения разных сред.
  • Атрибут Exchange и Queue: настройте атрибут выключенияочередности, например постоянство, ожидание автоматического удаления.
  • Политики: Определите политики для автоматической настройки выключателей, ссылок и параметров.
  • соединятьиряд:Конфигурациясоединятьирядизсвойство,Например, интервал сердцебиения, максимальный размер кадра.
  • Плагины: включение и настройка различных плагинов, таких как плагины управления, плагины STOMP ожидания.
  • Кластер и высокая доступность: настройте кластер и зеркало для обеспечения высокой доступности.
  • Журнал и мониторинг: настройте уровень журнала, цель и плагин мониторинга.
  • Безопасность: Конфигурация SSL/TLS Параметры, серверная часть аутентификации и другие настройки, связанные с безопасностью.

Из-за ограниченности навыков автора и ограниченности места в этой статье объясняются только технические детали, связанные с программированием на C#, чтобы понять навыки кодирования и механизм работы RabbitMQ.

Установка и настройка

Установите RabbitMQ

Читатели могут RabbitMQ Найдите его в официальной документации.приезжатьвесьиз Установить Учебник:https://www.rabbitmq.com/download.html

В этой статье для развертывания используется Docker.

RabbitMQ Список зеркал сообщества:https://hub.docker.com/_/rabbitmq

Создайте каталог для сопоставления томов хранения:

Язык кода:javascript
копировать
mkdir -p /opt/lib/rabbitmq

Разверните контейнер:

Язык кода:javascript
копировать
docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /opt/lib/rabbitmq:/var/lib/rabbitmq \
rabbitmq:3.12.8-management

Во время развертывания два порта заняты. 5672 — это порт связи MQ, а 15672 — это порт инструмента пользовательского интерфейса управления.

Откройте порт 15672, и вы попадете на страницу входа в Интернет. Учетная запись и пароль по умолчанию — гостевые.

Что касается использования пользовательского интерфейса управления RabbitMQ, мы представим его позже.

Открыв интерфейс управления, вы увидите Exchanges В меню вы можете увидеть таблицу, как показано ниже. Это обменники по умолчанию. Вам не нужно знать эти вещи сейчас, они будут представлены позже.

Virtual host

Name

Type

Features

/

(AMQP default)

direct

D

/

amq.direct

direct

D

/

amq.fanout

fanout

D

/

amq.headers

headers

D

/

amq.match

headers

D

/

amq.rabbitmq.trace

topic

D I

/

amq.topic

topic

D

модель публикации и подписки

использовать C# развивать RabbitMQ,нуждатьсяиспользовать nuget представлять RabbitMQ.Client,Адрес документа официального сайта:.NET/C# RabbitMQ Client Library — RabbitMQ

Прежде чем продолжить работу со статьей, создайте консольную программу.

Производители, потребители, обменники, очереди

Чтобы облегчить понимание, в этой статье приведены десятки изображений, позволяющих прийти к согласию относительно значения некоторых графических изображений:

В соответствии с производителем, использование представляется следующим образом:

дляпотребитель,использовать Как показано на рисунке ниже:

дляинформацияочередь,использовать Как показано на рисунке ниже:

длявыключатель,использовать Как показано на рисунке ниже:

существовать RabbitMQ , сообщения, опубликованные производителем, не попадут в очередь напрямую, а пройдут через обмен (Exchange) Распределено по различным очередям. Как упоминалось ранее, развертывание RabbitMQ После этого по умолчанию есть Семь переключателей, напр. (AMQP default)amq.direct ждать.

конечно,длясейчассуществовать Давайте поговорим,Нам не нужно знать выключатель,так,существоватьВ этом разделе из Учебник,Будет использоваться выключатель по умолчанию, чтобы завершить эксперимент.

существоватьпренебрегатьвыключательжитьсуществоватьизслучай,Мы можем упростить процесс производства и потребления, как показано ниже:

Пожалуйста, обязательно обратите внимание,Не показано на картинкевыключательизжитьсуществовать,потому чтодляиспользоватьиздапо умолчаниюизвыключатель.Дапродюсертолкатьинформациядолжендатолкатьприезжатьвыключатель,и Нетдаочередь,Эту фразу необходимо прояснить.

дляпотребитель Давайте поговорим,хотетьиспользоватьочередь,долженубеждатьсяочередьужечерезжитьсуществовать。

использовать C# Код и параметры объявления (создания) очереди следующие:

Язык кода:javascript
копировать
// объявить поворот
channel.QueueDeclare(
	// очередьимя
	queue: "myqueue",

	// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
	durable: false,

	// Поворот удаляется при закрытии соединения
	exclusive: false,

	// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
	autoDelete: false,

	// Дополнительная настройка параметров
	arguments: null
	);

Полный пример кода:

Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

// соединять
using IConnection connection = factory.CreateConnection();

// ряд
using IModel channel = connection.CreateModel();

channel.QueueDeclare(
	// очередьимя
	queue: "myqueue",

	// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
	durable: false,

	// Поворот удаляется при закрытии соединения
	exclusive: false,

	// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
	autoDelete: false,

	// Дополнительная настройка параметров
	arguments: null
	);
  • queue:очередьизимя。
  • durable:настраиватьда, стоит ли упорствовать。Выносливостьизочередьвстречажитьтарелка,существующие могут гарантировать, что соответствующая информация не будет потеряна при перезапуске сервера.
  • exclusive настраиватьда Никаких эксклюзивов。еслиодиночередьодеялозаявлениедляэксклюзивныйочередь,Поворот виден только в первый раз.,исуществоватьсоединять Автоматически при отключенииудалить.
  • Эта конфигурация основана на IConnection из, то же самое IConnection создаватьиз Неттакой жеряд (IModel) , также будет соблюдать это правило.
  • autoDelete:настраиватьда Нет автоматическогоудалить.автоматическийудалитьизпомещениеда По меньшей мереиметьодинпотребительсоединятьприезжатьэтоточередь,Изназад Местоиметьиэтоточередьсоединятьизпотребитель Когда оба отключены,Только тогда оно будет автоматически удалено.
  • argurnents: Установите некоторые другие параметры очереди, такие как очередьинформации время ждать.

ifturn был сохранен и не требует повторного выполнения QueueDeclare()。Повторные звонки QueueDeclare(),если параметры одинаковы,Никаких побочных эффектов,Не будет никаких проблем, если информационная информация уже подана.

Однако, если QueueDeclare() Если параметр, если отличается от существующей конфигурации, может быть сообщено об ошибке.

В общем, для разумной архитектуры и надежности, будет определяться архитектором и т.д. Заранее создайте выключатель, по очереди, и тогда клиент сможет его напрямую использовать. Как правило, его нельзя устанавливать при запуске программы, что приведет к большой неопределенности и побочным эффектам.

Произведено отредактированием информациичасиз Код также очень прост,обозначениехотетьотправлятьприезжать Которыйвыключательилимаршрутизациясередина Прямо сейчас Может。

Обязательно обратите внимание, что когда производитель RabbitMQ отправляет сообщение, оно отправляется на обмен, а не непосредственно в очередь!

Язык кода:javascript
копировать
channel.BasicPublish(

	// использоватьпо умолчаниювыключатель
	exchange: string.Empty,

	// К какому повороту нажать?
	routingKey: "myqueue",// очередьсвойство
	basicProperties: null,

	// Для отправки изинформации вам нужно сначала конвертировать byte[]
	body: Кодировка.UTF8.GetBytes("Тест")
	);

BasicPublish Есть три перегрузки:

Язык кода:javascript
копировать
BasicPublish(
    PublicationAddress addr, 
    IBasicProperties basicProperties, 
    ReadOnlyMemory<byte> body)
Язык кода:javascript
копировать
BasicPublish(string exchange, 
             string routingKey, 
             IBasicProperties basicProperties, 
             ReadOnlyMemory<byte> body)
Язык кода:javascript
копировать
BasicPublish(string exchange, 
             string routingKey, 
             bool mandatory = false, 
             IBasicProperties basicProperties = null, 
             ReadOnlyMemory<byte> body = default)
  • exchange: выключательизимя,если Оставьте пустымвстречатолкатьприезжатьпо умолчаниювыключатель.
  • routingKey: маршрутизацияключ,выключательв соответствии смаршрутизацияключ Воляинформацияжитьмагазинприезжатьсоответствующийизочередьсреди。
  • basicProperties:информациясвойство,Если истекло времяждать.
  • mandatory:ценитьдля false час,есливыключательбезиметьобязательностьподходящийизочередь,ноСообщение будет потеряно。ценитьдля true час,есливыключательбезиметьобязательностьподходящийизочередь,новызоветIModel.BasicReturn событие.

IBasicProperties basicProperties Параметр интерфейса, который мы можем использовать IModel.CreateBasicProperties() Создайте объект интерфейса.

IBasicProperties Многие атрибуты инкапсулированы в интерфейс, поэтому нам не нужно отображать конфигурацию передачи в строках.

IBasicProperties Его полные свойства следующие:

Язык кода:javascript
копировать
// Нанесение логотипа из ID
public String AppId { set; get; }

// Кластер идентичности из ID
public String ClusterId { set; get; }

// Укажите содержимое информации из метода кодирования, например "utf-8"
public String ContentEncoding { set; get; }

// обозначениеинформациясодержаниеиз MIME тип, например "application/json"
public String ContentType { set; get; }

// Используется для связи информации с отношениями, обычно используется Сценарий RPC (удаленный вызов процедуры)
public String CorrelationId { set; get; }

// Укажите метод сохранения, значение 1: Не постоянно, значение 2: Устойчивое развитие
public Byte DeliveryMode { set; get; }

// Указывает срок действия в миллисекундах
public String Expiration { set; get; }

// Настроить информацию из заголовка
public IDictionary`2 Headers { set; get; }

// Укажите уникальный идентификатор
public String MessageId { set; get; }

// да, стоит ли упорствовать
public Boolean Persistent { set; get; }

// Указывает уровень приоритета в диапазоне от 0 приезжать 9
public Byte Priority { set; get; }

// Предназначен для ответа на информациюизочередное имя
public String ReplyTo { set; get; }

// Предназначен для ответа на информацию об адресе
public PublicationAddress ReplyToAddress { set; get; }

// обозначениеинформацияизвремяштамп
public AmqpTimestamp Timestamp { set; get; }

// тип информации
общественный String Type { set; get; }

// Идентифицировать пользователя из ID
public String UserId { set; get; }

При отправке сообщений IBasicProperties можно настроить для отдельных сообщений:

Язык кода:javascript
копировать
использование соединения IConnection = Factory.CreateConnection();
используя канал IModel = Connection.CreateModel();

// создаём Два поворота
канал.QueueDeclare(очередь: "q1", durable: false, exclusive: false, autoDelete: false);

var properties = channel.CreateBasicProperties();
// Пример 1:
properties.Persistent = true;
properties.ContentType = "application/json";
properties.ContentEncoding = "UTF-8";

// Пример 2:
//properties.Persistent = true;
//properties.ContentEncoding = "gzip";
//properties.Headers = new Dictionary<string, object>();

channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "q1",
	basicProperties: properties,
	body: Encoding.UTF8.GetBytes($"Test{i}")
);

для IBasicProperties изиспользовать, более подробное описание будет приведено далее в статье.

Сейчас существуют, мы толкнули это 10 полоскаинформацияприезжатьочередьсередина,Затемсуществовать Management UI Наблюдать.

Язык кода:javascript
копировать
int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

Мы можем существовать UI из Queues and Streams серединасмотретьприезжатькогдавперед Местоиметьизочередь。

Можно смотретьприезжатькогдавпередочередьсерединаиз Ready состояние Unacked состояниеизинформациячисло,соответственнопереписыватьсявышесерединаиз Ожидание доставки впотребительизинформациячислоибыл доставлен впотребитель Даеще нетполучатьприезжатьсигнал подтвержденияизинформациячисло

После нажатия на кнопку «Открыть» появится интерфейс «Открытие», как показано ниже.

Сначала посмотрите Обзор.

«Готово» означает количество, которое еще не было израсходовано.

Unacked относится к потреблению, но не ack изинформациячислоколичество。

другой Message rates диаграмма,Относится к изданию публикации, информации о скорости потребления.,Потому что для не важно,Поэтому здесь это не объясняется.

существовать Bindings середина,Можно смотретьприезжать Долженочередьобязательность Понятнопо умолчаниюизвыключатель.

Затем напишите потребитель,Потребляйте очередьсерединаизинформация,Что Полный код выглядит следующим образом:

Язык кода:javascript
копировать
использование RabbitMQ.Client;
использование RabbitMQ.Client.Events;
использование System.Text;

Фабрика ConnectionFactory = новая Фабрика ConnectionFactory
{
	Имя хоста = «локальный хост»
};

использование соединения IConnection = Factory.CreateConnection();
используя канал IModel = Connection.CreateModel();

канал.QueueDeclare(
	// поворот имя
	очередь: "myqueue",

	// Конфигурация персистентности, может ли включение существовать broker Выжить после перезагрузки
	durable: false,

	// Поворот удаляется при закрытии соединения
	exclusive: false,

	// Когда последний потребитель(еслииметьизязык) отписывается, следует автоматически удалить эту очередь.
	autoDelete: false,

	// Дополнительная настройка параметров
	arguments: null
	);

// определениепотребитель
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
};

// Начните потреблять
channel.BasicConsume(queue: "myqueue",
					 autoAck: true,
					 consumer: consumer);

Console.ReadLine();

Обратите внимание: если вводится несуществующая изочередь, программа сообщит об исключении.

существоватьпотребитель до выхода из программы, то есть IConnection был Dispose() Извперед,Можетсуществовать Consumers Информация о потребительской клиентской программе представлена ​​в формате .

А что, если мы будем только потреблять и не настраивать автоматическое подтверждение?

Измените потребительский код на:

Язык кода:javascript
копировать
channel.BasicConsume(queue: "myqueue",
					 autoAck: false,
					 consumer: consumer);

Полный код выглядит следующим образом:

Язык кода:javascript
копировать
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.QueueDeclare(
	queue: "myqueue",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

// определениепотребитель
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
};

// Начните потреблять
channel.BasicConsume(queue: "myqueue",
					 autoAck: false,
					 consumer: consumer);

Console.ReadLine();

В этот момент вы найдете,Вся информация прочитана,Да Unacked для 10。

Как показано ниже,autoAck: false После этого, если вы перезапустите программу (только потреблять, не отправлять сообщения), то программа продолжит потреблять снова.

дляеще нет ack изинформация,потребительсновасоединятьназад,RabbitMQ Нажму еще раз.

и Kafka Разное изда, Кафка если не ack когда информация, но сервер автоматически повторно отправит информацию о статье потребителю, если эта информация о статье не заполнена, но всегда будет блокировать существование здесь. идля RabbitMQ,была подтверждена информация будет временно проигнорирована,автоматический Потребление下одинполоска。так На основании этого,По умолчанию RabbitMQ также не может гарантировать порядок информации.

конечно, RabbitMQ очень гибкийиз,Мы можем выборочно потреблять детали.,Избегайте блокировки перед когдаинформация, из-за чего программа не сможет работать дальше:

Язык кода:javascript
копировать
// определениепотребитель
	int i = 0;
	var consumer = new EventingBasicConsumer(channel);
	consumer.Received += (model, ea) =>
	{
		var message = Encoding.UTF8.GetString(ea.Body.Span);
		Console.WriteLine($" [x] Received {message}");
		i++;
        // Убедитесь, что информация используется правильно
		if (i % 2 == 0)
			channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
	};

	// Начните потреблять
	channel.BasicConsume(queue: "myqueue",
						 autoAck: false,
						 consumer: consumer);

существовать в некоторых ситуациях,Эта функция очень полезна,Мы можем воля множественные неудачи при выполнении изинформация сначала отложим это в сторону,Переход к следующей статье,Избегайте накопления информации.

Несколько рабочих очередей

если одна и та же очередьиз Разные клиенты привязать квыключательсередина,Несколько потребителей работают вместе и говорят,Так что же происходит?

для Первый тип ситуации,RabbitMQ Сообщения будут распределяться поровну каждому клиенту.

Условие выполняется избазиса да, два потребителя различны изпотребителя, если они существуют. ть Одна и та же программа участвует в разных экземплярах потребления, но да, потому что для признается одним и тем же потребителем, и правило но недействительно.

Однако RabbitMQ и Нетвстречасмотретьеще нетподтверждатьизинформациячислоколичество,это Толькодаслепо Воля Нет. n сообщение отправлено на n индивидуальныйпотребитель

кроме тогосуществоватьобозначениевыключательимяизслучай,Мы можем воля маршрутизировать Набор ключей для пустых,Размещая таким образом изинформацию будет пересылать выключательприезжать соответствующий изв. очереди.

Язык кода:javascript
копировать
	channel.BasicPublish(
	exchange: "logs",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);

и очередность соответствует одному выключателю. Ситуация сложнее.,назадлапшаиз Главы будут упомянутыприезжать。

продюсерипотребитель Все могутиспользовать QueueDeclare() Приходитьобъявить поворот。так называемыйиззаявление,Собственно запрос на создание очереди на RabbitMQ Broker,Так что кто бы ни пришел к создателю, он тот же самый.

изаявлениеочередь Связанныйиз,Есть еще две функции:

Язык кода:javascript
копировать
// Независимо от того, потерпит неудачу создатель или нет, игнорируйте это.
channel.QueueDeclareNoWait();
// Определите, содержит ли очередь существование. Если нет, появится исключение. Если существует, ничего не произойдет.
channel.QueueDeclarePassive();

Дополнительно мы можем удалить очередь:

Язык кода:javascript
копировать
// ifUnused: очередьбезиметьодеялоиспользоватьчас
// ifEmpty: Когда нет накопления изинформации в свою очередь
channel.QueueDelete(queue: "aaa", ifUnused: true, ifEmpty: true);

Тип переключателя

Производители могут отправлять сообщения только на биржи, а не в очереди.

При отправке сообщения вы можете указать имя обмена и ключ маршрутизации.

Как показано в следующем коде:

Язык кода:javascript
копировать
	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);

ExchangeType Несколько типов типов определены в от переключателя.

Язык кода:javascript
копировать
    public static class ExchangeType
    {
        public const string Direct = "direct";
        public const string Fanout = "fanout";
        public const string Headers = "headers";
        public const string Topic = "topic";
        private static readonly string[] s_all = {Fanout, Direct, Topic, Headers};
    }

существоватьиспользоватьодинвыключатель Извперед,нуждаться Первыйзаявлениеодинвыключатель:

Язык кода:javascript
копировать
channel.ExchangeDeclare("logs", ExchangeType.Fanout);

есливыключательужежитьсуществовать,Повторное выполнение кода запроса,Толькохотеть Конфигурацияисейчасжитьизвыключатель Конфигурация Распределение,но RabbitMQ Ничего не делайте и побочных эффектов не будет.

Да,Не может быть разных конфигураций,Напримеружежитьсуществоватьизвыключательда Fanout Введите, но повторно выполните оператор кода, в свою очередьдля Direct тип.

Функция ExchangeDeclare определяется следующим образом:

Язык кода:javascript
копировать
ExchangeDeclare(string exchange, 
                string type, 
                bool durable = false, 
                bool autoDelete = false,
                IDictionary<string, object> arguments = null)
  • exchange: выключи.
  • type переключиться с тип, Такие как разветвление, директ, тема.
  • durable: Установите, является ли да постоянным durab ,еслиценитьдля правда, он не будет потерян даже после перезапуска сервера.
  • autoDelete:настраиватьда Нет автоматическогоудалить.
  • argument:Что Он несколько структурированпараметр.

конечно,выключательтакже Можетодеялоудалить.

Язык кода:javascript
копировать
// ifUnused Он будет удален только в случае существования очередностибылиспользоватьиз.
channel.ExchangeDelete(exchange: "log", ifUnused: true);

Есть еще один NotWait метод.

Язык кода:javascript
копировать
channel.ExchangeDeclareNoWait("logs", ExchangeType.Direct);
//channel.ExchangeDeclareNoWait(...);

Даже если при переобъявлении и удалении обменника возникла проблема, поскольку он возвращает void, в случае неудачной операции не будет сообщено об исключении.

Существует также способ определить, существует ли переключатель.

Язык кода:javascript
копировать
channel.ExchangeDeclarePassive("logs")

После создания нескольких очередей также необходимо привязать очереди к обменнику.

Как показано в следующем коде,Его выключатель связывает два поворота,продюсертолкатьинформацияприезжатьвыключательчас,Два поворота Всевстречаполучатьприезжатьтакой жеизинформация。

Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// создаватьвыключатель
channel.ExchangeDeclare("logs",ExchangeType.Fanout);

// создавать Два поворота
channel.QueueDeclare(
	queue: "myqueue1",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);
channel.QueueDeclare(
	queue: "myqueue2",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);

channel.QueueBind(queue: "myqueue1", exchange: "logs", routingKey: string.Empty);
channel.QueueBind(queue: "myqueue2", exchange: "logs", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "logs",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

После отправки сообщения каждая связанная logs выключательизочередь Всевстречаполучатьприезжатьтакой жеизинформация。

Обратите внимание: поскольку биржа не хранит сообщения, создайте еще одну myqueue3 изинформацияочередьпривязка logs переключатель, myqueue3 Тольковстречаловитьполучатьприезжатьобязательность Изназадтолкатьизинформация,Нет Могу получитьприезжать更早Извпередизинформация。

Выключатели бывают следующих типов:

  • прямой: на основе routingKey Воляинформацияпередачаприезжатьочередь。
  • тема: Немного сложно. По данным маршрутного ключа и для Воляочередьпривязать квыключательизмежду режимамиизсоответствовать Воляинформациямаршрутизацияприезжатьодин или несколькоочередь。
  • заголовки: В этой статье это не рассматривается, поэтому объяснений не будет.
  • fanout: Просто привяжите, не нужно беспокоиться о маршрутизации.
Direct

direct основан на routingKey Воляинформациятолкатьприезжать Неттакой жеизв очереди.

Сначала создайте несколько очередей.

Язык кода:javascript
копировать
// создавать Два поворота
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");

Затем при привязке очереди к коммутатору для отношения привязки необходимо установить RouteKey.

Язык кода:javascript
копировать
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");

Наконец, при отправке сообщения вам необходимо указать имя обмена и ключ маршрутизации.

Язык кода:javascript
копировать
// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);

Когда сообщение отправляется на logs переключатель, переключатель будет основан на routingKey Воляинформация Впередприезжатьпереписыватьсяизв очереди.

весьиз Пример кода выглядит следующим образом:

Язык кода:javascript
копировать
// создаватьвыключатель
channel.ExchangeDeclare("logs",ExchangeType.Direct);

// создавать Два поворота
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");

// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");

// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);

После запуска выяснилось, что только direct1 очередь Можетполучатьприезжатьинформация,потому чтодляэтотоснован использовать при связывании routingKey=debug Решите из.

Fanout

Толькохотетьочередьобязательность Понятновыключатель,нокаждыйвыключатель Всевстречаполучатьприезжать Такой жеизинформация,Fanout проигнорирую routingKey。

Как показано в следующем коде:

Язык кода:javascript
копировать
// создаватьвыключатель
channel.ExchangeDeclare("logs1",ExchangeType.Fanout);

// создавать Два поворота
channel.QueueDeclare(queue: "fanout1");
channel.QueueDeclare(queue: "fanout2");

// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs1", queue: "fanout1", routingKey: "debug");
channel.QueueBind(exchange: "logs1", queue: "fanout2", routingKey: "info");

// При отправке информации необходимо указать routingKey
channel.BasicPublish(
exchange: "logs1",
routingKey: "debug",
basicProperties: null,
body: Кодировка.UTF8.GetBytes($"Тест")
);
Topic

Topic будет основано на routingKey Найдите совпадения, соответствующие критериям изочередь, в свою очередь можно использовать .#* Для выделения используются три вида символов. Тема Правила зонирования относительно гибкие.

существоватьсоздаватьочередь Изназад,обязательностьвыключательчас,routingKey использоватьвыражение。

Язык кода:javascript
копировать
// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");

При отправке сообщения RouteKey необходимо указать полное имя.

Язык кода:javascript
копировать
// отправка информации
channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.green",
	basicProperties: null,
	body: Кодировка.UTF8.GetBytes($"Тест")
);

Во-первых, ключ маршрутизации будет основано на . Символы разделены.

например red.yellow.green будет разбит на [red,yellow,green] Три части.

Если вы хотите смутно отличить часть, вы можете использовать *。например red.*.green , может быть присвоен red.aaa.greenred.666.green

* Можетсуществоватьлюбая частьиспользовать,например *.yellow.**.*.green

# Можно выделить несколько частей, например red.# Может быть назначен на red.ared.a.ared.a.a.a

весьиз Пример кода выглядит следующим образом:

Язык кода:javascript
копировать
// создаватьвыключатель
channel.ExchangeDeclare("logs3",ExchangeType.Topic);

// создавать Два поворота
channel.QueueDeclare(queue: "topic1");
channel.QueueDeclare(queue: "topic2");

// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");

// отправка информации
channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.green",
	basicProperties: null,
	body: Кодировка.UTF8.GetBytes($"Тест")
);

channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.yellow.green",
	basicProperties: null,
	body: Кодировка.UTF8.GetBytes($"Тест")
);

Два сообщения были отправлены на logs переключатель, где routingKey=red.green изинформация, будь red.# зонированы и, следовательно, будут перенаправлены на topic1 в очереди.

и routingKey=red.yellow.green изинформацию можно разделить на два направления, так topic1 и topic 2 можно получить.

переключатель привязки переключателя

Помимо привязки очередей, переключатели также можно привязывать к переключателям.

Пример:

Воля b2 привязать к b1 средний, b2 можно получить b1 изинформация。

Язык кода:javascript
копировать
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);

После привязки нажмите b1 выключательизинформация,встречаодеяло Впередприезжать b2 выключатель.

Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Fanout);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

// Потому что для обоих да ExchangeType.Fanout,
// так routingKey использовать string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);


// создаватьочередь
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "b1",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

конечно,Может Волявыключатель、очередьтакой жечаспривязать к b1 в переключателе.

Кроме того, два типа выключателей могут быть разными. Однако это немного усложнит правила распределения.

Язык кода:javascript
копировать
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

нас Можетпонимается каксуществоватьвыключательобязательностьчас,b2 Взаимнодляодиночередь。когда b1 установлен на Direct При использовании переключателя также необходимо указать при привязке переключателя routingKey。

Язык кода:javascript
копировать
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");

и b2 выключательи q2 Очередь, все еще Fanout отношения не затронуты.

Это значит, b1, b2 даа отношения,этоихиз Отображение отношений Нетвстреча Влияниеприезжатьдругие люди,также Нетвстреча Влияниеприезжатьследующий уровень。

Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
using RabbitMQ.Client;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

// Потому что для обоих да ExchangeType.Fanout,
// так routingKey использовать string.Empty
channel.ExchangeBind(destination: "b2", source: "b1",routingKey: "demo");


// создавать Два поворота
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "b1",
	routingKey: "demo",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

Потребитель, атрибуты сообщения

потребитель BasicConsume Функция определяется следующим образом:

Язык кода:javascript
копировать
BasicConsume(string queue,
            bool autoAck,
            string consumerTag,
            IDictionary<string, object> arguments,
            IBasicConsumer consumer)

В разных подписках на потребление используются разные теги потребления. (consumerTag) отличить одно от другого ,существоватьтакой жеодинряд(IModel)серединаизпотребитель Его необходимо отличать тегом потребителя, который не обязательно устанавливать по умолчанию.

  • queue:очередьизимя。
  • autoAck:настраиватьда Нет автоматическогоподтверждать。
  • consumerTag: потребитель Этикетка,Используется для различения нескольких товаров.
  • arguments:настраиватьпотребительиз Чтоонпараметр.

Впереди у нас есть EventingBasicConsumer создавать IBasicConsumer Интерфейс-потребитель программы, среди которых EventingBasicConsumer Содержит следующие события:

Язык кода:javascript
копировать
public event EventHandler<BasicDeliverEventArgs> Received;
public event EventHandler<ConsumerEventArgs> Registered;
public event EventHandler<ShutdownEventArgs> Shutdown;
public event EventHandler<ConsumerEventArgs> Unregistered;

этотнекоторыйсобытиевстречасуществоватьинформацияиметь дело сиз Неттакой же阶段одеялокурок。

Потребительская программа имеет два режима потребления: выталкивающий и вытягивающий.,Передний Упомянулприезжатьизкод Вседарежим нажатия,При появлении новой информации,RabbitMQ встречаавтоматическийтолкатьприезжатьпотребительпрограммасередина。

Язык кода:javascript
копировать
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
	channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

// Начните потреблять
channel.BasicConsume(queue: "myqueue5",
					 autoAck: false,
					 consumer: consumer,
					 consumerTag: "demo");

еслииспользоватьрежим вытягивания(BasicGet() письмочисло),Таксуществовать RabbitMQ Broker Если в изочереди нет информации, она будет возвращена null。

Язык кода:javascript
копировать
// Начните потреблять
while (true)
{
	var result = channel.BasicGet(queue: "q1", autoAck: false);

	// если нетянутьприезжатьинформациячас
	if (result == null) 
    {
      // Избегайте бесконечных запросов, когда нет информации
      Thread.Sleep(100);
      continue;   
    }

	Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
	channel.BasicAck(deliveryTag: result.DeliveryTag, multiple: false);
}

когдаиспользовать BasicGet() При ручном извлечении информации программа не сохраняет программу для потребителя, что означает RabbitMQ из Consumer Невозможно увидеть в .

Как в режиме push, так и в режиме pull при подтверждении сообщения используется несколько параметров.

  • если Воля multiple настраиватьдля false,но Толькоподтверждатьобозначение deliveryTag изодинполоскаинформация。
  • если Воля multiple настраиватьдля true,новстречаподтверждать Местоиметь Сравниватьобозначение deliveryTag Небольшая из и была подтверждена информация.

информацияиз deliveryTag Свойства ulong Тип, представляющий смещение информации, от 1.... Начни считать.

существуют При массовом получении информации и ее обработке используйте multiple Приходитьподтверждатьодин组информация,Нет необходимости подтверждать по пунктам,Это повышает эффективность.

Кос, отказаться от получения

Потребительская программа может установить Qos.

Язык кода:javascript
копировать
channel.BasicQos(prefetchSize: 10, prefetchCount: 10, global: false);

prefetchSize:этотженьшеньчисловыражатьпотребитель Местоспособныйловитьполучатьеще нетподтверждатьинформацияизобщий размеризверхний предел,настраиватьдля 0 Это означает, что верхнего предела нет.

prefetchCount: изметод Приходитьнастраиватьпотребитель客户端最大способныйловитьполучатьизеще нетподтверждатьизинформациячисло。этот Конфигурацияираздвижное окночислоколичество意思差Нетмного。

глобальный немного особенный.

Когда global for имеет значение false, только новый потребитель должен соответствовать правилам.

в случае global для true время, то же самое IConnection серединаизпотребительвсевстречаодеяло Исправлять Конфигурация。

Язык кода:javascript
копировать
// не затронуто
// 	var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);

channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);

// Затронут новый потребитель
// 	var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);

когдаполучатьприезжатьинформациячас,если необходимо явно отклонить информацию,Можетиспользовать BasicReject,RabbitMQ Будет ли воля информация удалена из очереди.

BasicReject() Вызовет мертвую почту сообщения.

Язык кода:javascript
копировать
while (true)
{
	var result = channel.BasicGet(queue: "q1", autoAck: false);
	if (result == null) continue;

	Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
	channel.BasicReject(deliveryTag: result.DeliveryTag, requeue: true);
}

если requeue Настройки параметровдля true ,но RabbitMQ встречаснова Воляэтотполоскаинформацияжитьвходитьочередь,Чтобы его можно было отправить на следующую подписку изпотребителю.,Другими словами, программу можно будет получить снова после перезапуска.

если requeue Настройки параметровдля false ,но RabbitMQ немедленно удалит информацию из очереди и не отправит ее новому потребителю.

если хотите отклонить информацию оптом.

Язык кода:javascript
копировать
channel.BasicNack(deliveryTag: result.DeliveryTag, multiple: true, requeue: true);

multiple для true Когда, но выражает отказ deliveryTag Номер до того, как бывший потребитель подтверждает изинформацию.

BasicRecover() метод, используемый для начала с RabbitMQ Повторное получение также подтвердило информацию

когда requeue=true час,былподтверждатьизинформациявстречаодеялоснова加входитьприезжатьочередьсередина,для того же, что и информация,Он будет закреплен за другим потребителем.

когда requeue=false,Та же информация будет присвоена тому же потребителю, что и раньше.

Язык кода:javascript
копировать
channel.BasicRecover(requeue: true);
// асинхронный
channel.BasicRecoverAsync(requeue: true);

Режим подтверждения сообщения

Переднийнестиприезжать,когда autoAck=false В то время, хотя новостей не было ок, но RabbitMQ Он все равно перейдет к следующему сообщению.

для гарантии порядка информацияиз,существоватьеще нет Волякогдавперединформация Расход завершенизслучай,Автоматическое потребление следующей информации не допускается.

Тольконуждатьсяиспользовать BasicQos Просто настройте:

Язык кода:javascript
копировать
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// создаватьвыключатель
channel.ExchangeDeclare("acktest",ExchangeType.Fanout);

// создавать Два поворота
channel.QueueDeclare(queue: "myqueue5");

// использовать routingKey обязательностьвыключатель
channel.QueueBind(exchange: "acktest", queue: "myqueue5", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	// отправка информации
	channel.BasicPublish(
	exchange: "acktest",
	routingKey: string.Empty,
	basicProperties: null,
	body: Кодировка.UTF8.GetBytes($"Тест")
	);
	i++;
}

// еще нет ack Раньше, не могу съесть следующий
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
	// channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

// Начните потреблять
channel.BasicConsume(queue: "myqueue5",
					 autoAck: false,
					 consumer: consumer);

После предыдущего кода,ты найдешь,Нет.одинполоскаинформациябыл ack , программа не будет автоматически читать следующую информацию и не будет повторно получать ack изинформация。

если Мы хотим прочитать было еще раз ack изинформация, вы можете перезапустить программу или использовать BasicRecover() Позвольте серверу снова нажать.

Сохранение сообщений

Ранее упоминалась функция BasicPublish в определении:

Язык кода:javascript
копировать
BasicPublish(string exchange, 
             string routingKey, 
             bool mandatory = false, 
             IBasicProperties basicProperties = null, 
             ReadOnlyMemory<byte> body = default)

когданастраивать mandatory = true час,есливыключательнеспособен полагаться на себяизтипимаршрутизацияключпопытаться найтиприезжатьодинсоответствоватьполоскакускиизочередь,Так RabbitMQ Триггерный клиент из IModel.BasicReturn событие, Воляинформация возвращается производителю 。

С точки зрения дизайна, IConnection Хотя их может быть больше одного IМодель (канал),Но да рекомендует писать только потребительскую программу или программу-производитель.,Не рекомендуется для смешанного использования.

потому чтодля Все видысобытиеиочередь Конфигурация,дапротиводин IМодель (канал) установить из.

Язык кода:javascript
копировать
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.BasicReturn += (object sender, BasicReturnEventArgs e) =>
{

};

когда установлено mandatory = true час,если Долженинформацияпопытаться найти Нетприезжатьочередьжитьмагазининформация,Так Сразувстреча Триггерный клиент из BasicReturn Прием мероприятий BasicPublish Отказ изинформации.

Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Runtime;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(exchange: "e2", type: ExchangeType.Fanout, durable: false, autoDelete: false);


channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
	Console.WriteLine($"неверныйинформация:{Encoding.UTF8.GetString(e.Body.Span)}");
};


int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e2",
	routingKey: string.Empty,

	// обязательный=true, он будет срабатывать, когда не будет получена информация BasicReturn событие
	mandatory: true,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}


Console.ReadLine();

существоватьдействительныйразвиватьсередина,когда mandatory=false час,еслиодинполоскаинформациятолкатьприезжатьвыключатель,Но да не связан поворот,Тогда информация о статье будет потеряна.,Это может привести к серьезным последствиям.

исуществовать RabbitMQ середина,нести供ПонятноодиндобрыйодеялосказатьдляЗапасное колесовыключательизплан,этотдапроходитьсуществоватьопределениевыключательчасдобавить в alternate-exchange Параметры, которых нужно достичь. Его функциядакогда A выключатель Не могу найтиприезжатьочередь Вперединформациячас,Сразувстреча Воляинформация Впередприезжать B в очереди.

Полный пример кода выглядит следующим образом:

первыйсоздавать e3_bak очередь,затемсоздавать e3 в свою очередь при установке запасного колеса переключательдля e3_bak。

Затем e3_bak необходимо связать очередь для приема сообщений.

Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e3_bak",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

// заявление e3 выключатель,когда e3 выключательбезиметьобязательностьочередьчас,информация Волявстречаодеяло Впередприезжать e3_bak выключатель
channel.ExchangeDeclare(
	exchange: "e3",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false,
	arguments: new Dictionary<string, object> {
		{ "alternate-exchange", "e3_bak" }
	}
	);

channel.QueueDeclare(queue: "q3", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q3", "e3_bak", routingKey: string.Empty);

// Потому что для уже установлено e3 иззапаснойвыключатель,так Нетвызовет BasicReturn
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
	Console.WriteLine($"неверныйинформация:{Encoding.UTF8.GetString(e.Body.Span)}");
};


int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e3",
	routingKey: string.Empty,
	// Потому что для уже установлено e3 иззапаснойвыключатель,таквключатьэтот Нетвызовет BasicReturn
	mandatory: true,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}

Console.ReadLine();

Уведомление,если выключатель запасного колеса правильно ли он закреплен, поверните из слов,Тогда информация будет потеряна.

если e3 да Direct,e3_bak или Директ, то у обоих должно быть одинаковое из routingKey,если e3 Есть один в routingKey = cat,Да e3_bak существования не существует в соответствующем из routingKey,Тогда информация или да будут потеряны. Есть и другие ситуации,Я не буду здесь вдаваться в подробности.

При отправке сообщения появляется IBasicProperties basicProperties Атрибуты, атрибуты интерфейса были представлены в предыдущем разделе, когда IBasicProperties.DeliveryMode=2 информация Воля отмечена как постоянная, даже если RabbitMQ Даже если сервер будет перезапущен, сообщения не потеряются.

условно говоря,Благодаря предыдущему эксперименту,ты Можетнаблюдатьприезжатьклиенточередьизинформация Все Потребление完毕назад,очередьсерединаизинформация Всевстречапропадать。ипереписываться Kafka Например, topic Если среда израсходована, она все равно сохранится. Это следует принять к сведению, используйте RabbitMQ час,Персистентность необходимо настроить заранее.,Избегайте потребления или еще нетуспех Потреблениечас,информацияпотерянный。

Когда производитель существования выдвигает информацию, вы можете использовать IBasicProperties.DeliveryMode=2 Воля Долженинформациянастраиватьдля Выносливость。

Язык кода:javascript
копировать
	var ps = channel.CreateBasicProperties();
	ps.DeliveryMode = 2;

	channel.BasicPublish(
	exchange: "e3",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);

Время жизни сообщения

настраивать Время жизни сообщенияназад,Информацияеслисуществовать не должна быть использована в течение определенного времени.,Тогда информация становится мертвой информацией. для такого рода информации,Будет примерно две ситуации обработки.

Установлен первый, еслиочередь "x-dead-letter-exchange" ,Так Долженинформациявстречаодеялооточередь Впередприезжатьдругойвыключательсередина。этотдобрыйметодсуществоватьмертвое письмовыключательодин节серединавстречапредставлять。

Во втором случае сообщение отбрасывается.

На данный момент существует два способа Установить сообщение из TTL.

Первый типметоддапроходитьнастройки атрибутов Turn,Таким образом, вся информация, в свою очередь, имеет одинаковое время истечения срока действия.

Второй способ — настроить одну строку индивидуально, и для каждой строки будет TTL. Может быть разным.

если обе настройки вместеиспользуйте,ноинформацияиз TTL кКакое из двух значений меньше, является более точным?。информациясуществоватьочередьсерединаизрожденныйжитьчасодин旦超过настраивать TTL ценитьчас,потребитель Воля Больше не могуполучатьприезжать Долженинформация,такбольшинствонастраиватьмертвое письмовыключатель.

Первый — установить очередь:

Язык кода:javascript
копировать
channel.QueueDeclare(queue: "q4",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: new Dictionary<string, object>() { { "x-message-ttl", 6000 } });

Второй способ — настроить срок действия сообщения, задав свойства.

Язык кода:javascript
копировать
var ps = channel.CreateBasicProperties();
// Единица миллисекунда
ps.Expiration = "6000";

для Первый типнастраиватьочередьсвойствоизметод,По истечении срока действия информация будет удалена в свою очередь.(еслинастраивать Понятномертвое письмовыключатель,встречаодеяло Впередприезжатьмертвое письмовыключательсередина)。исуществовать Нет.二добрыйметодсередина,Даже если срок действия информации истек,И при этом он не будет стерт немедленно из своей очереди.,потому чтодля Долженполоскаинформациясуществовать Прямо сейчас Волядоставкаприезжатьпотребитель Извперед,Только тогда будет проверена, истек ли срок действия информации. для Вторая ситуация,когдаочередь при выполнении любой операции опроса,будет фактически удалено.

для Вторая ситуация,Пока идет опрос дасуществовать,Оно будет удалено только после истечения срока его действия.,Но как только срок действия истекает,Сразувстречаодеяло Впередприезжатьмертвое письмоочередьсередина,Только да сразу не удалят.

Время TTL в очереди

когда по сравнению с установкой поворота TTL Если очередьсуществовать предусматривает, что время не потребляется функцией use, тогда очередь будет удалена. Это ограничение включает в себя то, что время не потребляется информацией (включая ее). BasicGet() способ потребленияиз)、безиметьодеялосновазаявление、безиметьпотребительсоединять,В противном случае удаленное время обратного отсчета будет сброшено.

Язык кода:javascript
копировать
channel.QueueDeclare(queue: "q6",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: new Dictionary<string, object>
	{
		// Единица миллисекунды, набор очередь Истекшийвремяда 1 Час
		{"x-expires",1*3600*1000}
	});

Обменник недоставленных писем DLX

DLX(Dead-Letter-Exchange) мертвое письмовыключатель,Став мертвой буквой в A,этоспособныйодеялосноваодеялоотправлятьприезжатьдругой B в переключателе.в A очередьобязательность Понятномертвое письмовыключатель,ТаксуществоватьManagement UI Интерфейс увидит DLX логотип А переключатель Б — обычный переключатель,Никакой настройки не требуется.

Сообщение становится мертвым письмом в следующих ситуациях:

  • информацияодеялопотребительотклонять,BasicReject()BasicNack() Две функции могут отклонять сообщения.
  • Срок действия сообщения истек.
  • очередьдостигатьприезжатьмаксимальная длина。

когда это место A серединажитьスペータストмертвое письмоинформациячас,RabbitMQ Сразувстречаавтоматическийземля Воляэтотинформациясновавыпускатьприезжатьнастраиватьизвыключатель B середина. Обычно мертвое письмо создается специально для важного изочередного выключателя. B,ивыключатель B Вам также необходимо привязать очередь C Только тогда, иначе сообщение потеряется.

Настройка поворота При появлении недоставленного сообщения информация,Воляинформация Впередприезжать Которыйвыключательсередина:

Язык кода:javascript
копировать
channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e7_bak" } });

Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e7_bak",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q7_bak", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q7_bak", "e7_bak", routingKey: string.Empty);

channel.ExchangeDeclare(
	exchange: "e7",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e7_bak" } });

channel.QueueBind(queue: "q7", "e7", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e7",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"Test{i}"));
	i++;
}

Thread.Sleep(1000);

int y = 0;
// определениепотребитель
channel.BasicQos(0, prefetchCount: 1, true);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");

	if (y % 2 == 0)
		channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

	// requeue Чтобы установить для false Вот и все,
	// В противном случае эта информация будет возвращена в очередь после отклонения.
	else
		channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
	Interlocked.Add(ref y, 1);
};

// Начните потреблять
channel.BasicConsume(queue: "q7",
					 autoAck: false,
					 consumer: consumer);

Console.ReadLine();

очередь задержки

RabbitMQ Прямой поддержкиочередности как таковой нет. задержкииз Функция。

Так что же будет с для?

После главного да в информационный толчок,Нежелание быть съеденным немедленно. например говорит,После того, как пользователь разместил заказ,если нет оплаты в течение 10 минут,Так Должен Заказвстречаодеялоавтоматический Отмена。такнуждаться Делатьодининформацияодеяло延迟Потреблениеиз Функция。

такобъяснять,Фактический спросда,Информация должна существовать до того, как она сможет быть использована потребителем.

Чтобы выполнить эту функцию в RabbitMQ, вам понадобится два выключателя и как минимум два выключателя.

Идеидаопределениедвавыключатель e8、e9 и Два поворота q8, q9, переключатель e8 иочередь q8 Привязка, переключатель e9 и q9 Привязка.

Вот и самый важный момент, q9 поставил мертвую букву очередь,когда Время жизни сообщенияприезжатьчас,Впередприезжать e9 в переключателе.так,e9 выключатель - q9 очередь ловитьполучатьприезжатьиз Вседаприезжать Ожидать(или者объяснять Истекший)изинформация。

существоватьотправка информацииприезжать e8 выключательчас,настраивать TTL время。когда q8 очередьсерединаизинформация Истекшийчас,информациявстречаодеяло Впередприезжать e9 выключатель,Затем внесите депозит q9.

потребитель Просто нужно подписаться q9 очередь,Прямо сейчас Может Потреблениеприезжать Ожидатьназадизинформация。

все Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e8",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.ExchangeDeclare(
	exchange: "e9",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q9", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q9", "e9", routingKey: string.Empty);

channel.QueueDeclare(queue: "q8", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e9" } });

channel.QueueBind(queue: "q8", "e8", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	var ps = channel.CreateBasicProperties();
	ps.Expiration = "6000";

	channel.BasicPublish(
	exchange: "e8",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);
	i++;
}


var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] ужеприезжать Ожидатьинформация {message}");
};

// Начните потреблять
channel.BasicConsume(queue: "q9",
					 autoAck: true,
					 consumer: consumer);

Console.ReadLine();

Приоритет сообщения

Чем выше Приоритет сообщения, тем быстрее оно будет израсходовано потребителем.

Пример кода выглядит следующим образом:

Язык кода:javascript
копировать
var ps = channel.CreateBasicProperties();
// приоритет 0-9 
ps.Priority = 9;

	channel.BasicPublish(
	exchange: "e8",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"Test{i}")
	);

Итак, RabbitMQ Последовательность информации не обязательно может быть гарантирована, что аналогично Kafka Есть разница между да и из.

механизм транзакции

механизм транзакциида,Издатель подтвердилинформацияодин定толкатьприезжать RabbitMQ Broker , часто вместе с бизнес-кодом.

напримеробъяснять,После того, как пользователь успешно оплатит,толкатьодинуведомитьприезжать RabbitMQ в очереди.

База данных, когда, естественно, должна выполнять транзакции,Таким образом, измененные данные будут отменены после сбоя платежа. Но вот проблема,если информация была отправлена,Дачисло据库却раз滚Понятно。

этотчасждатьвстречас участиемприезжатьпоследовательность,Можетиспользовать RabbitMQ измеханизм транзакции Приходитьиметь дело с,Идея аналогична процессу транзакции базы данных.,Также существуют операции фиксации и отката.

Чтоглазиздаубеждатьсяинформацияуспехтолкатьприезжать RabbitMQ Broker Помимо обеспечения согласованности данных с другими кодами на клиенте, push-сообщения и операции кода выполняются одновременно или одновременно откатываются.

Однако RabbitMQ Транзакции и транзакции базы данных — это две вещи. когда один из них RabbitMQ Когда транзакция завершена, но программа зависла, происходит откат другой транзакции базы данных. На этом этапе данные снова будут противоречивыми. потому чтоэтот,Также необходимо обеспечить возможность завершения двух этапов одновременно. в это время,нас Можетспособный又нуждатьсяпредставлятьвторой этапнестиплатить、поддерживатькомпенсационный механизмждать.этотразприезжать Поле распределенной транзакции。

Это полное из Пример кода выглядит следующим образом:

Язык кода:javascript
копировать
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// Клиент отправляет Канал Tx.Select.Воля установлен в режим транзакций;
channel.TxSelect();

try
{
	// отправка информации
	channel.QueueDeclare(queue: "transaction_queue",
						 durable: false,
						 exclusive: false,
						 autoDelete: false,
						 arguments: null);

	string message = "Hello, RabbitMQ!";
	var body = Encoding.UTF8.GetBytes(message);

	channel.BasicPublish(exchange: "",
						 routingKey: "transaction_queue",
						 basicProperties: null,
						 body: body);


	// выполнить ряд действий

	// совершить транзакцию
	channel.TxCommit();
	Console.WriteLine(" [x] Sent '{0}'", message);
}
catch (Exception e)
{
	// Откат транзакции
	channel.TxRollback();
	Console.WriteLine("An error occurred: " + e.Message);
}

Console.ReadLine();

Механизм подтверждения отправителя

Механизм подтверждения отправителя,дагарантироватьинформацияодин定толкатьприезжать RabbitMQ из схемы.

имеханизм транзакции,Обычно дадля обеспечения последовательности,Нажмите информацию, и другие операции будут успешными или неудачными одновременно,Между ними не может быть никакого противоречия.

Что Полный пример кода выглядит следующим образом:

Язык кода:javascript
копировать
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// Включить режим подтверждения отправителя
channel.ConfirmSelect();

string exchangeName = "exchange_name";
string routingKey = "routing_key";

// определениевыключатель
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);

// отправка информации
string message = "Hello,RabbitMQ!";
вар тело = Encoding.UTF8.GetBytes(сообщение);

// выпускатьинформация
канал.BasicPublish(обмен: ExchangeName,
					 routingKey: routingKey,
					 basicProperties: null,
					 body: body);

// ждатьподтверждатьужетолкатьприезжать RabbitMQ
if (channel.WaitForConfirms())
{
	Console.WriteLine(" [x] Sent '{0}'", message);
}
else
{
	Console.WriteLine("Message delivery failed.");
}

Console.ReadLine();

Статья написана к этому моменту, ровно 10 000 слов.

для RabbitMQ Кластеризация, эксплуатация и обслуживание и другие технологии не будут подробно описаны в этой статье.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose