Выпущенная в 2007 году, это корпоративная система обмена сообщениями многократного использования, основанная на AMQP (расширенный протокол очереди сообщений). В настоящее время это одно из наиболее распространенных промежуточных программ для обмена сообщениями.
RabbitMQ — это реализация AMQP (Advanced Message Queue Advanced Message Queuing Protocol) с открытым исходным кодом, разработанная erlang. Благодаря высоким характеристикам параллелизма язык erlang имеет хорошую производительность. По сути, это очередь FIFO. , а содержимое, хранящееся в нем, является сообщением.
RabbitMQ — это промежуточное программное обеспечение для сообщений: оно получает сообщения и пересылает их, как курьерская станция. Продавец доставляет нам экспресс через экспресс-станцию. То же самое относится и к MQ, который получает и сохраняет сообщения, а затем пересылает их.
Первый тип:Продюсер проиграл.данные。Производители будутданныеотправить в RabbitMQ Иногда данные могут быть потеряны на полпути из-за проблем с сетью или других проблем.
Второй вид:RabbitMQ Потерял данные. MQ еще не сохранился и умер сам по себе.
Третий вид:Потребительская часть потеряла данные. Только что потратил,Еще не обработано,В результате процесс зависает,Например, перезапуск.
Вариант 1 : Запустить транзакцию RabbitMQ. Вы можете использовать RabbitMQ Предоставленная функция транзакции активируется до того, как производитель отправит данные. RabbitMQ Channel.txВыберите транзакцию, а затем отправьте сообщение, если сообщение не было успешно отправлено. RabbitMQ В случае получения производитель получит сообщение об ошибке исключения. В это время транзакцию Channel.txRollback можно откатить и отправить сообщение повторно; если сообщение получено, транзакция Channel.txCommit может быть отправлена.
// Включи дела
channel.txSelect
try {
// Отправить сообщение сюда
} catch (Exception e) {
channel.txRollback
// Повторно опубликовать это сообщение здесь
}
// Отправить дела
канал.txCommit
недостаток: RabbitMQ Механизм транзакций является синхронным. После того, как вы отправите транзакцию, она будет заблокирована. Таким образом, пропускная способность в основном снизится, поскольку она потребляет слишком много производительности.
Вариант 2. Использовать механизм подтверждения.
Самая большая разница между механизмом транзакции и механизмом подтверждения заключается в том, что механизм транзакции является синхронным. После отправки транзакции она будет заблокирована, но механизм подтверждения является асинхронным.
После того, как производитель включит режим подтверждения, каждому написанному сообщению будет присвоен уникальный идентификатор. Затем, если оно записано в RabbitMQ, RabbitMQ отправит вам ответное сообщение, сообщающее, что сообщение отправлено ОК, если RabbitMQ Если сообщение не удалось; для обработки, вам перезвонит интерфейс nack и сообщит, что сообщение не удалось отправить, и вы можете повторить попытку. И вы можете объединить этот механизм, чтобы знать, что вы сохраняете идентификатор каждого сообщения в памяти. Если вы не получили обратный вызов для этого сообщения по истечении определенного периода времени, вы можете отправить его повторно.
//Открываем подтверждение
channel.confirm();
//Отправляем успешный обратный вызов
public void ack(String messageId){
}
// Отправить обратный вызов при ошибке
public void nack(String messageId){
//Повторно отправить сообщение
}
Давайте поговорим о трёх моментах:
(1) Чтобы гарантировать, что RabbitMQ не теряет сообщения, вам необходимо включить механизм сохранения RabbitMQ, то есть сохранять сообщения на жестком диске, чтобы даже если RabbitMQ зависает, он все равно мог читать сообщения с жесткого диска после перезапуск;
(2) Что делать, если в RabbitMQ есть одна точка отказа. Эта ситуация не приведет к потере сообщений. Здесь мы упомянем три режима установки RabbitMQ: автономный режим, режим обычного кластера и режим зеркального кластера. необходимо обеспечить высокую производительность RabbitMQ. Если доступно, вы должны сотрудничать с HAPROXY для создания режима зеркального кластера.
(3) Как гарантировать, что сообщения не потеряются, если жесткий диск сломан?
Сообщения RabbitMQ по умолчанию хранятся в памяти. Если не указаны специальные настройки, сообщения не будут сохраняться на жестком диске. Если узел перезапускается или случайно выходит из строя, сообщения будут потеряны.
Поэтому необходимо сохранить сообщение. Как сохраниться, подробно описано ниже:
Чтобы добиться устойчивости сообщения, должны быть выполнены следующие три условия, каждое из которых является обязательным.
1) Сохранение настроек обмена
2) Постоянство наборов очередей
3) Постоянная отправка сообщения. Чтобы отправить сообщение, установите режим доставки DeliveryMode=2, который представляет собой постоянное сообщение.
Давайте сначала представим три режима развертывания RabbitMQ:
1)Режим одного узла:простейший случай,некластеризованный режим,Узел не работает,Сообщение больше не доступно. Бизнес может быть парализован,Все, что мы можем сделать, это ждать.
2)Обычный режим:Сообщение будет существовать только в текущем узле.,Он не будет синхронен с другими узлами.,Текущий узел не работает,Влиятельный бизнес будет парализован,Вы можете только дождаться восстановления и перезапуска узла (если сообщения должны сохраняться).
3)Зеркальный режим:пресс-конференциясинхронныйк другим узлам,Вы можете установить количество узлов для синхронного,Но пропускная способность упадет. Решение высокой доступности, принадлежащее RabbitMQ
Зачем настраивать кластер с зеркальным режимом, потому что содержимое очереди существует только на определенном узле и не существует на всех узлах. Все узлы хранят только структуры сообщений и метаданные. Ниже я нарисовал картинку, чтобы представить ситуацию потери сообщений в обычных кластерах:
Если вы хотите решить вышеуказанные проблемы и гарантировать, что сообщения не будут потеряны, вам необходимо использовать очередь режима зеркалирования HA.
Давайте представим это нижеТри режима стратегии высокой доступности:
1) Синхронизировать со всеми
2) Синхронизировать до N машин
3) Синхронизировать только с узлами, соответствующими указанному имени.
Шаблон политики высокой доступности обработки команд: Rabbitmqctl set_policy [-p Vhost] Определение шаблона имени [Приоритет]
1) Установите зеркало всех узлов для каждой очереди, начиная с "rock.wechat", и установите для него режим автоматической синхронизации Rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all", " ha-sync-mode":"automatic"}' Rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
2) Установите два зеркала узла для каждой очереди, начиная с «rock.wechat», и установите режим автоматической синхронизации Rabbitmqctl set_policy -p rock ha-exacly «^rock.wechat». '{"ha-mode":"точно","ha-params":2,"ha-sync-mode":"automatic"}'
3) Назначьте указанный узел каждой очереди, начиная с «узла», для зеркалирования Rabbitmqctl set_policy ha-nodes «^nodes». '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
Однако у зеркальной очереди высокой доступности есть большой недостаток: снизится пропускная способность системы.
Зачем нам нужен механизм компенсации сообщений? Будут ли сообщения по-прежнему теряться? Да, система находится в сложной среде. Не думайте слишком просто. Хотя приведенные выше три решения в принципе могут обеспечить высокую доступность сообщений и предотвратить их потерю.
Но как начинающий программист я обязательно должен обеспечить стабильность своей системы и иметь ощущение кризиса.
Например: В процессе сохранения постоянных сообщений на жёсткий диск текущий узел очереди зависает, жёсткий диск узла хранения выходит из строя и сообщение теряется. Что делать?
1) Производственная сторона сначала сохраняет бизнес-данные и данные сообщения в базе данных. Это должно быть в одной транзакции. Если данные сообщения не могут быть сохранены в базе данных, вся транзакция будет отменена.
2) В соответствии со статусом сообщения в таблице сообщений, в случае сбоя будут приняты меры по компенсации сообщения и сообщение будет отправлено повторно.
Вариант 1: механизм подтверждения ACK
Несколько потребителей получают сообщения одновременно. Например, при получении сообщения на полпути потребитель умирает (слишком сложная логика, истекает время, или потребление прекращается, или сеть отключается). не потеряно?
Используя механизм подтверждения, предоставляемый RabbitMQ, сервер сначала отключает автоматическое подтверждение RabbitMQ, а затем вручную вызывает ACK в коде каждый раз после проверки того, что сообщение обработано. Это позволит избежать подтверждения до обработки сообщения. После этого сообщение удаляется из памяти.
Это решает проблему. Даже если у потребителя возникнет проблема, сообщение не будет синхронизировано с сервером, и его будут использовать другие потребители, гарантируя, что сообщение не будет потеряно.
Если необходимо обеспечить, чтобы сообщения не терялись по всей ссылке, то производственная сторона, сам mq и потребительская сторона должны работать вместе, чтобы обеспечить это.
Производственная сторона:Маркировка статуса создаваемых сообщений,Включите механизм подтверждения,Обновить статус сообщения на основе ответа mq,Используйте запланированные задачи для повторной доставки сообщений об истечении времени ожидания.,Предупреждайте, если произойдет несколько сбоев доставки.
сам mq:Включить сохранение,И затем подтверждение после размещения заказа. Если это режим развертывания зеркала,Прежде чем активировать, необходимо получить несколько копий.
Потребительская сторона:Включить вручнуюackмодель,Выполните подтверждение после завершения бизнес-обработки.,И его идемпотентность должна быть гарантирована.
Благодаря вышеуказанной обработке теоретически потери сообщений не происходит, но снижается пропускная способность и производительность системы.
В реальной разработке необходимо учитывать влияние потери сообщений, чтобы найти компромисс между надежностью и производительностью.