Всем привет, мы снова встретились, я ваш друг Цюаньчжаньцзюнь.
Эта статья содержит много контента, включая некоторые простые теоретические введения, связанные с RabbitMq, примеры отправки сообщений поставщика, примеры потребления сообщений потребителями, использование Direct, Topic, Fanout, обратные вызовы сообщений, ручное подтверждение и т. д. (Но я не буду знакомить с установкой RabbitMq)
После установки RabbitMq,входитьhttp://ip:15672/ , вы можете увидеть простой интерфейс управления фоном.
Что мы можем делать в этом интерфейсе? Вы можете вручную создавать виртуальные хосты, пользователей, назначать разрешения, создавать коммутаторы, создавать очереди и т. д. Вы также можете просматривать сообщения очереди, эффективность потребления, эффективность push-уведомлений и т. д.
Вышеописанные операции интерфейса управления не будут подробно описываться в данной статье, хотелось бы остановиться на тех, которые будут использоваться в последующих примерах.
Сначала мы представим простой процесс отправки сообщения получателю и предоставим простую диаграмму:
RabbitMq -JCccc
Желтый кружок — это наша служба отправки сообщений. Сообщение отправляется в средний блок, который является сервером RabbitMq, а затем данные обрабатываются и ставятся в очередь с помощью различных взаимосвязей, таких как переключатели и очереди на сервере (подробно это будет обсуждаться позже). ). Наконец, потребитель в синем круге справа получает соответствующее отслеживаемое сообщение.
Существует три наиболее часто используемых переключателя. Поскольку потребители получают информацию из очередей, а очереди привязаны к переключателям (как правило), соответствующие режимы отправки/получения сообщений также будут следующими:
Direct Exchange
Коммутатор прямого подключения доставляет сообщение в соответствующую очередь в соответствии с ключом маршрутизации, содержащимся в сообщении.
Общий процесс заключается в том, что очередь привязывается к напрямую подключенному коммутатору и ей назначается ключ маршрутизации. Затем, когда сообщение содержит значение маршрутизации X и сообщение отправляется коммутатору через производителя, коммутатор будет искать очередь со значением привязки
Fanout Exchange
Секторный переключатель, этот коммутатор не имеет понятия ключей маршрутизации, даже если вы привяжете ключи маршрутизации, он будет игнорироваться. После получения сообщения этот коммутатор перенаправит его непосредственно во все привязанные к нему очереди.
Topic Exchange
Переключение темы, это переключение на самом деле похоже на процесс прямого переключения, но его особенностью является то, что между его ключами маршрутизации и ключами привязки существуют правила. Коротко о правилах:
* (звездочка) используется для обозначения слова (должен появиться) # (хэштег) Используется для представления любого количества (ноля или более) слов. Ключ привязки с подстановочными знаками привязан к очереди. Вот небольшой пример. Очередь Q1 Ключ привязки — это *.TT.* Очередь Q2Ключ привязки — это TT.# Если сообщение содержит ключ маршрутизации A.TT.B,Так Очередь Q1 будет получен; Если сообщение содержит ключ маршрутизацииTT.AA.BB, то получит очередь Q2;
Переключатель тем очень мощный, почему он такой раздутый? когда очередь Ключа привязки — это "#"(хэштег) , эта очередь будет игнорировать ключ маршрутизации сообщения и получать все сообщения. когда * (звездочка) и # (хэштег) Когда эти два специальных символа не появляются в ключе привязки, переключатель темы ведет себя как переключатель с прямым подключением. Таким образом, переключатель темы также реализует функции переключателя сектора и переключателя прямого подключения.
Существуют также обмен заголовками, обмен по умолчанию и обмен недоставленными письмами. Они не описаны в этой статье.
В этом примере руководства требуется создание двух проектов Springboot: одного RabbitMQ-провайдера (производителя) и одного RabbitMQ-потребителя (потребителя).
Сначала создайте провайдер RabbitMQ,
Зависимости Jar, используемые в pom.xml:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Затем application.yml:
ps: Элемент конфигурации виртуального хоста внутри не обязателен. Я создал свой собственный виртуальный хост на сервисе RabbitMQ, поэтому настроил его, если вы его не создаете, этот элемент конфигурации добавлять не нужно;
server:
port: 8021
spring:
#Дайте проекту имя
application:
name: rabbitmq-provider
#Настройка RabbitMq сервер
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#виртуальныйхост Вам не нужно его устанавливать и использовать хост сервера по умолчанию.
virtual-host: JCcccHost
Затем мы сначала используем прямой обмен (коммутатор с прямым подключением), создаем DirectRabbitConfig.java (для очереди и настроек персистентности коммутатора и использования соединения).,Это объяснено в комментариях,Следующие конфигурации различных коммутаторов не будут объясняться одинаково):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//очередь Имя: Тестдиректкуеуе
@Bean
public Queue TestDirectQueue() {
// Долговечность: сохранение, значение по умолчанию — false, очередь сохранения: будет храниться на диске, она все равно будет существовать при перезапуске агента сообщений, временная очередь: соединение действительно до того момента, когда
// эксклюзивный: значение по умолчанию также равно false, оно может использоваться только соединением, созданным до того, и очередь будет удалена после того, как соединение будет закрыто. Этот приоритет ссылки выше, чем устойчивый.
// autoDelete: следует ли автоматически удалять, если ни один производитель или потребитель не использует эту очередь, очередь будет автоматически удалена.
// return new Queue("TestDirectQueue",true,true,false);
//Обычно просто задайте постоянство очереди, а два других по умолчанию являются ложными.
return new Queue("TestDirectQueue",true);
}
//Прямой переключатель Имя: TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//Связывать Привяжите очередь и переключитесь, И установите ключ, используемый для сопоставления: TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
Затем напишите простой интерфейс для отправки сообщений (его также можно изменить на запланированную задачу и т. д. в зависимости от потребностей), SendMessageController.java:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //Используем RabbitTemplate, который обеспечивает получение/отправку и другие методы
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//Переносим сообщение со значением ключа привязки: TestDirectRouting Отправить на переключение TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
Запустите проект RabbitMQ-Provider и вызовите следующий интерфейс:
Поскольку мы еще не настроили потребителя RabbitMQ, сообщение не было использовано. Давайте перейдем на страницу управления RabbitMq, чтобы проверить, успешна ли отправка:
Посмотрите еще раз на очередь (можете проверить, что означает каждый английский пункт в интерфейсе, можете проверить сами, все равно полезно для понимания):
Очень хорошо, сообщение отправлено на сервер RabbitMq.
Затем создайте проект RabbitMQ-потребитель:
Зависимости Jar в pom.xml:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
Затем application.yml:
server:
port: 8022
spring:
#Дайте проекту имя
application:
name: rabbitmq-consumer
#Настройка RabbitMq сервер
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#виртуальныйхост Вам не нужно его устанавливать и использовать хост сервера по умолчанию.
virtual-host: JCcccHost
Потом то же самое,создаватьDirectRabbitConfig.java(Фактически, для простого использования потребителями нет необходимости добавлять эту конфигурацию, просто создайте последующий мониторинг и используйте аннотации, чтобы позволить прослушивателю отслеживать соответствующую очередь. Если настроено, потребитель на самом деле является генератором и может отправить сообщение.):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//очередь Имя: Тестдиректкуеуе
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Прямой переключатель Имя: TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange");
}
//Связывать Привяжите очередь и переключитесь, И установите ключ, используемый для сопоставления: TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
Затем создайте класс прослушивания приема сообщений DirectReceiver.java:
@Component
@RabbitListener(queues = "TestDirectQueue") // имя очереди прослушивания TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель DirectReceiver получает сообщение : " + testMessage.toString());
}
}
Затем запустите проект RabbitMQ-Consumer, и вы увидите, что ранее отправленное сообщение использовано:
Затем вы можете продолжить вызов интерфейса push-сообщений проекта RabbitMQ-Provider и увидеть немедленное потребление сообщений потребителем:
Итак, поскольку переключатель прямого соединения является взаимно-однозначным, что произойдет, если мы настроим несколько мониторов для привязки к одной и той же очереди для одного и того же взаимодействия с прямым соединением?
Видно, что сообщения обрабатываются методом опроса и повторного потребления нет.
Далее используем переключатель темы Topic Exchange.
Создайте TopicRabbitConfig.java в проекте RabbitMQ-Provider:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TopicRabbitConfig {
//Связыватьключ
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//Привязываем firstQueueиtopicExchange, а значение привязанного ключа — theme.man
//Таким образом, пока ключ маршрутизации, содержащийся в сообщении, имеет значение theme.man, оно будет распределено по очереди.
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//Привязываем секундуQueueиtopicExchange, а связанное значение ключа является темой правила ключа маршрутизации с подстановочными знаками.#
// Таким образом, пока ключ маршрутизации, содержащийся в сообщении, начинается с темы, оно будет распространяться в очередь.
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
Затем добавьте еще 2 интерфейса для отправки сообщений в переключатель темы:
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
}
Продюсер завершен. Не спешите его запускать. Создайте TopicManReceiver.java в проекте RabbitMQ-Consumer:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель TopicManReceiver получает сообщение : " + testMessage.toString());
}
}
Создайте еще один TopicTotalReceiver.java:
package com.elegant.rabbitmqconsumer.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель TopicTotalReceiver получает сообщение : " + testMessage.toString());
}
}
Аналогичным образом добавьте соответствующую конфигурацию переключателя темы TopicRabbitConfig.java (Должны ли потребители добавлять эту конфигурацию? На самом деле в этом нет необходимости. Причина была упомянута ранее):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TopicRabbitConfig {
//Связыватьключ
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//Привязываем firstQueueиtopicExchange, а значение привязанного ключа — theme.man
//Таким образом, пока ключ маршрутизации, содержащийся в сообщении, имеет значение theme.man, оно будет распределено по очереди.
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//Привязываем секундуQueueиtopicExchange, а связанное значение ключа является темой правила ключа маршрутизации с подстановочными знаками.#
// Таким образом, пока ключ маршрутизации, содержащийся в сообщении, начинается с темы, оно будет распространяться в очередь.
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
Затем запустите проекты RabbitMQ-Provider и RabbitMQ-Consumer, сначала вызовите интерфейс /sendTopicMessage1:
Затем посмотрите на вывод консоли потребителя RabbitMQ-потребителя: TopicManReceiver прослушивает очередь 1, Ключ привязки — это:topic.man TopicTotalReceiver прослушивает очередь 2, Ключ привязки — это:topic.# Сообщение, отправленное до этого, содержит ключ маршрутизации: theme.man.
Таким образом, вы можете видеть, что оба прослушивающих получателя-потребителя успешно воспользовались сообщением, поскольку ключи привязки очередей, отслеживаемых двумя получателями, могут совпадать с ключом маршрутизации, содержащимся в этом сообщении.
Затем вызовите интерфейс/sendTopicMessage2:
Затем посмотрите на вывод консоли потребителя RabbitMQ-потребителя: TopicManReceiver прослушивает очередь 1, Ключ привязки — это:topic.man TopicTotalReceiver прослушивает очередь 2, Ключ привязки — это:topic.# Сообщение, отправленное до того, когда содержит ключ маршрутизации: Topic. Woman.
Итак, вы можете видеть, что из двух прослушивающих потребителей только TopicTotalReceiver успешно воспользовался сообщением.
Далее следует использование секторных переключателей Fanout Exchange.
Аналогичным образом сначала создайте FanoutRabbitConfig.java в проекте RabbitMQ-Provider:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class FanoutRabbitConfig {
/**
* Создайте три очереди :fanout.A fanout.B fanout.C
* Привязать все три очереди к коммутатору fanoutExchange начальство
* Поскольку это переключатель секторов, Ключи маршрутизации не нужно настраивать, и настройка не будет работать.
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
Затем напишите интерфейс для отправки сообщений,
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
Затем добавьте класс потребления сообщений в проект RabbitMQ-Consumer,
FanoutReceiverA.java:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель FanoutReceiverA получил сообщение : " +testMessage.toString());
}
}
FanoutReceiverB.java:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель FanoutReceiverB получил сообщение : " +testMessage.toString());
}
}
FanoutReceiverC.java:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("Потребитель FanoutReceiverC получил сообщение : " +testMessage.toString());
}
}
Затем добавьте класс конфигурации переключателя вентилятора FanoutRabbitConfig.java (Действительно ли потребители хотят добавить эту конфигурацию? На самом деле в этом нет необходимости. Причина была упомянута ранее):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class FanoutRabbitConfig {
/**
* Создайте три очереди :fanout.A fanout.B fanout.C
* Привязать все три очереди к коммутатору fanoutExchange начальство
* Поскольку это переключатель секторов, Ключи маршрутизации не нужно настраивать, и настройка не будет работать.
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
Наконец, запустите проекты RabbitMQ-Provider и RabbitMQ-Consumer и вызовите следующий интерфейс/sendFanoutMessage:
Затем взгляните на консольную ситуацию проекта RabbitMQ-Consumer:
Видно, что пока сообщение отправляется веерообразному коммутатору fanoutExchange, три очереди привязаны к этому коммутатору, поэтому три класса приема сообщений отслеживают сообщение.
На этом этапе мы фактически закончили использовать три часто используемых переключателя, поэтому давайте продолжим разговор об обратных вызовах сообщений, которые на самом деле являются подтверждением сообщения (производитель успешно отправляет сообщение, а потребитель успешно получает сообщение).
В файле application.yml проекта RabbitMQ-Provider добавьте элемент конфигурации для подтверждения сообщения:
PS: В этой статье используется Springboot версии 2.1.7.RELEASE; Если вы настраиваете обратный вызов подтверждения и тест обнаруживает, что функция обратного вызова не может быть запущена, то причина может заключаться в том, что элемент конфигурации не действует из-за версии. Вы можете заменить издателя-подтверждения: true на издателя-подтверждения-типа: коррелированного.
server:
port: 8021
spring:
#Дайте проекту имя
application:
name: rabbitmq-provider
#Настройка RabbitMq сервер
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#виртуальныйхост Вам не нужно его устанавливать и использовать хост сервера по умолчанию.
virtual-host: JCcccHost
#Элемент конфигурации подтверждения сообщения
#Подтверждаем, что сообщение отправлено на коммутатор (Exchange)
publisher-confirms: true
#Подтверждаем, что сообщение отправлено в очередь (Queue)
publisher-returns: true
Затем есть функция обратного вызова для подтверждения сообщения, связанная с конфигурацией, RabbitConfig.java:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//Устанавливается для включения Обязательного вызова функции обратного вызова. Функция обратного вызова принудительно вызывается независимо от результата отправки сообщения.
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"данные корреляции: "+correlationData);
System.out.println("ConfirmCallback: "+"Подтверждение: "+ack);
System.out.println("ConfirmCallback: "+"причина: "+причина);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"Сообщение: "+сообщение);
System.out.println("ReturnCallback: "+"Код ответа: "+replyCode);
System.out.println("ReturnCallback: "+"Информация об ответе: "+replyText);
System.out.println("ReturnCallback: "+"переключатель: "+обмен);
System.out.println("ReturnCallback: "+"ключ маршрутизации: "+routingKey);
}
});
return rabbitTemplate;
}
}
На этом этапе подтверждение сообщения, отправленного производителем, завершило вызов функции обратного вызова. Вы можете видеть, что выше написаны две функции обратного вызова: одна называется ConfirmCallback, а другая — RetrunCallback; Итак, при каких обстоятельствах будут запущены две вышеупомянутые функции обратного вызова?
Давайте сначала проанализируем общую ситуацию. Существует четыре ситуации для push-сообщений:
①Сообщение отправляется на сервер, но коммутатор не может быть найден на сервере. ②Сообщение отправляется на сервер, и коммутатор найден, но очередь не найдена. ③Сообщение отправляется на сервер, но в коммутаторе и очереди ничего не найдено. ④Сообщение успешно отправлено
Так我先写几个接口来分别测试и Сертификация ниженачальство4ситуация,Когда подтверждение сообщения запускает функцию обратного вызова:
①Сообщение отправляется на сервер, но коммутатор не может быть найден на сервере. Напишите тестовый интерфейс и отправляйте сообщения на коммутатор с именем «non-existent-exchange» (этот переключатель не был создан и не настроен):
@GetMapping("/TestMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}
Вызовите интерфейс и проверьте консольный вывод проекта Rabbitmq-provuder (причина в том, что не найден переключатель «non-existent-exchange»):
2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)
ConfirmCallback: Связанные данные: ноль
ConfirmCallback: Подтверждение: ложное
ConfirmCallback: Причина: канал error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)
Вывод: ①Эта ситуация запускает функцию обратного вызова ConfirmCallback.
②Сообщение отправляется на сервер, и коммутатор найден, но очередь не найдена. В этом случае необходимо добавить переключатель, но не привязывать к этому переключателю очередь. Я просто добавлю в DirectRabitConfig прямой переключатель с именем 'lonelyDirectExchange', но не выполняю над ним никаких операций по настройке привязки:
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
Затем напишите тестовый интерфейс и отправьте сообщение на коммутатор с именем «lonelyDirectExchange» (этот коммутатор не имеет конфигурации очереди):
@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}
Вызовите интерфейс, чтобы просмотреть вывод консоли проекта Rabbitmq-provuder:
ReturnCallback: Сообщение: (Тело:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: Код ответа: 312
ReturnCallback: Ответное сообщение: NO_ROUTE
ReturnCallback: Переключатель: одинокийDirectExchange
ReturnCallback: Ключ маршрутизации: TestDirectRouting
ConfirmCallback: Связанные данные: ноль
ConfirmCallback: Подтверждение: правда
ConfirmCallback: Причина: ноль
Вы можете видеть эту ситуацию: вызываются обе функции; В этом случае сообщение было успешно отправлено на сервер, поэтому ConfirmCallback имеет значение true для подтверждения сообщения; Как вы можете видеть в параметрах печати функции обратного вызова RetrunCallback, сообщение было успешно отправлено на коммутатор, но при распределении маршрута по очереди очередь не была найдена, поэтому было сообщено об ошибке. NO_ROUTE 。 Вывод: ②Эта ситуация вызывает ConfirmCallback и RetrunCallbacktwoфункция обратного вызова.
③Сообщение отправляется на сервер, но в коммутаторе и очереди ничего не найдено. Эта ситуация на первый взгляд очень похожа на ①, да. , ③и① ситуации обратные вызовы последовательны, поэтому результаты не будут объяснены. в заключение: ③Эта ситуация вызывает ConfirmCallback функция обратного вызова.
④Сообщение успешно отправлено Затем для тестирования просто вызовите предыдущий интерфейс push-сообщения в обычном режиме и вызовите его /sendFanoutMessage вы можете увидеть вывод консоли:
ConfirmCallback: Связанные данные: ноль
ConfirmCallback: Подтверждение: правда
ConfirmCallback: Причина: ноль
в заключение: ④Эта ситуация вызывает ConfirmCallback функция обратного вызова.
Вышеупомянутое представляет собой введение в использование функций обратного вызова для подтверждения сообщений для сообщений, отправленных производителями (соответствующее расширение или обработка бизнес-данных может быть выполнена в функции обратного вызова в соответствии с потребностями).
Механизм подтверждения сообщения у производителя отличается, поскольку прием сообщения по своей сути является мониторингом сообщений, и будут использованы сообщения, соответствующие условиям. Таким образом, существует три основных режима механизма подтверждения приема сообщения:
①Автоматическое подтверждение, Это также ситуация подтверждения сообщения по умолчанию. AcknowledgeMode.NONE RabbitMQ успешно отправляет сообщение (то есть сообщение успешно записывается в TCP). Socket) сразу считает, что эта доставка обработана корректно, независимо от того, успешно ли обработала эту доставку сторона-потребитель. Таким образом, в этом случае, если логика потребления на стороне потребителя выдает исключение, то есть сторона потребителя не может успешно обработать сообщение, это эквивалентно потере сообщения. Обычно в этом случае мы используем try После перехвата исключения журнал распечатывается для отслеживания данных, чтобы соответствующие данные можно было найти для последующей обработки.
② Подтвердите в зависимости от ситуации, никаких представлений здесь не будет. ③ Ручное подтверждение, это более важно, а также это наиболее часто выбираемый режим при настройке механизма подтверждения для получения сообщений. После того, как потребитель получит сообщение и вручную вызовет Basic.ack/basic.nack/basic.reject, RabbitMQ считает доставку успешной только после получения этих сообщений. Basic.ack используется для положительного подтверждения. Basic.nack используется для отрицательных подтверждений (примечание: это расширение RabbitMQ для AMQP 0-9-1) Basic.reject используется для отрицательного подтверждения, но имеет ограничение по сравнению с Basic.nack: одновременно можно отклонить только одно сообщение.
Все три вышеуказанных метода на стороне потребителя указывают на то, что сообщение было доставлено правильно, но файл Basic.ack указывает на то, что сообщение было обработано правильно. А Basic.nack, Basic.reject означают, что они обрабатываются некорректно:
Давайте сосредоточимся на отклонении, потому что иногда некоторые сцены необходимо повторно поставить в очередь.
Channel.basicReject(deliveryTag, true); Отказаться от использования предыдущего сообщения;,Если второй параметр передан в true,Просто бросьте данные обратно в очередь,Тогда я воспользуюсь этой новостью в следующий раз. установить ложь,Просто скажи серверу,Я уже знаю данные этого сообщения,отклонить это по каким-то причинам,исервер Просто выбросьте эту новость.。 Я не хочу использовать это сообщение в следующий раз.
Будьте осторожны при использовании режима подтверждения повторной постановки в очередь после отклонения, поскольку обычно при возникновении исключения необходимо перехватить исключение, а затем отклонить постановку в очередь и выбрать, следует ли повторно ставить ее в очередь.
Однако если вы не используете когда, это приведет к тому, что некоторые сообщения, которые вы повторно поставите в очередь, будут продолжать находиться в очереди «потреблено-поставлено-потреблено-поставлено». Этот цикл приведет к накоплению сообщений.
Кстати, давайте поговорим вкратце, это также эквивалентно настройке не воспринимать определенное сообщение.
channel.basicNack(deliveryTag, false, true); Первый параметр по-прежнему является уникальным идентификатором данных, полученных в предыдущем сообщении; Второй параметр указывает, следует ли нацеливаться на несколько сообщений; если это правда, то есть, если идентификатор тега сообщения в предыдущем канале когда меньше, чем сообщение перед когда в одно время, будет выполнено подтверждение. отказался. Третий параметр указывает, следует ли повторно вводить в очередь, то есть возвращать ли неподтвержденные сообщения обратно в очередь.
Вы также должны быть осторожны при использовании режима подтверждения для повторной постановки в очередь после неподтверждения, поскольку сообщения могут быть снова отброшены из-за плохой обработки, что приведет к отставанию.
В потребительских проектах Добавьте код конфигурации, связанный с кодом, в новый MessageListenerConfig.java:
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/4
* @Description :
**/
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;//Класс обработки приема сообщений
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ по умолчанию использует автоматическое подтверждение, здесь оно изменено на Ручное. подтверждение消息 //Создаем очередь
container.setQueueNames("TestDirectQueue");
//Если одновременно установлено несколько настроек следующим образом: Предполагается, что очереди должны были быть созданы и существовать.
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//Другой способ настроить очередь. Если вы используете эту ситуацию и хотите установить несколько очередей, используйте addQueues.
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(myAckReceiver);
return container;
}
}
Соответствующий класс прослушивания сообщений с ручным подтверждением, MyAckReceiver.java (необходимо реализовать режим ручного подтверждения). ChannelAwareMessageListener): //Предыдущие связанные прослушиватели можно сначала закомментировать, чтобы избежать мониторинга одной и той же очереди несколькими прослушивателями одного и того же типа. //Преобразование сообщения о получении данных приведено здесь только для справки. Если сообщаемый массив выходит за пределы, вы можете настроить его в соответствии с форматом.
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//Поскольку карта используется для передачи сообщения, требуется некоторая обработка для удаления карты из сообщения.
String msg = message.toString();
String[] msgArray = msg.split("'");//Вы можете нажать на «Сообщение», чтобы просмотреть исходный код. Данные непосредственно в одинарных кавычках — это данные сообщения нашей карты.
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
String messageId=msgMap.get("messageId");
String messageData=msgMap.get("messageData");
String createTime=msgMap.get("createTime");
System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("Потребляемые сообщения тем поступают из: "+message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true); //Второй параметр, Ручное подтверждение может обрабатываться пакетно, если параметр равен true , вы можете подтвердить все это сразу delivery_tag Все сообщения меньше или равны входящему значению.
// channel.basicReject(deliveryTag, true);//Второй параметр true будет возвращен в очередь, поэтому вам нужно решить, когда использовать отклонение, исходя из вашей бизнес-логики.
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
//{key=value,key=value,key=value} Преобразование формата в карту
private Map<String, String> mapStringToMap(String str,int entryNum ) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",",entryNum);
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
В это время сначала вызовите интерфейс /sendDirectMessage, чтобы отправить сообщение в очередь TestDirectQueue напрямую подключенного коммутатора TestDirectExchange. Вы можете видеть, что прослушиватель потребляет нормально:
Но этой сцены зачастую недостаточно! Поскольку многие партнеры уже писали мне комментарии, им нужно перевести очереди мониторинга в режим ручного подтверждения в этом потребительском проекте, а бизнес-логика обработки сообщений другая.
Нет проблем, давайте посмотрим на код дальше
Сценарий: Помимо очереди TestDirectQueue, напрямую подключенной к коммутатору, которую необходимо изменить на ручное подтверждение, нам также необходимо изменить еще одну очередь.
Или несколько очередей могут быть подтверждены вручную, и разные очереди реализуют разные бизнес-процессы.
Итак, первый шаг, который нам нужно сделать, — это добавить несколько очередей в SimpleMessageListenerContainer:
Тогда наш класс прослушивания сообщений с подтверждением вручную, MyAckReceiver.java, сможет одновременно обрабатывать все сообщения в очереди, указанной выше.
Но если нам нужно выполнить другую обработку бизнес-логики, нам нужно различать обработку только по имени очереди, из которой поступает сообщение, например:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//Поскольку карта используется для передачи сообщения, требуется некоторая обработка для удаления карты из сообщения.
String msg = message.toString();
String[] msgArray = msg.split("'");//Вы можете нажать на «Сообщение», чтобы просмотреть исходный код. Данные непосредственно в одинарных кавычках — это данные сообщения нашей карты.
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
String messageId=msgMap.get("messageId");
String messageData=msgMap.get("messageData");
String createTime=msgMap.get("createTime");
if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("Имя очереди использованных сообщений: "+message.getMessageProperties().getConsumerQueue());
System.out.println("Сообщение успешно передано messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("Выполнить бизнес-процесс обработки сообщения в TestDirectQueue...");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("Имя очереди использованных сообщений: "+message.getMessageProperties().getConsumerQueue());
System.out.println("Сообщение успешно передано messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
System.out.println("Выполнить бизнес-процесс обработки сообщения в fanout.A...");
}
channel.basicAck(deliveryTag, true);
// channel.basicReject(deliveryTag, true);//Если true, то оно будет помещено обратно в очередь
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
//{key=value,key=value,key=value} Преобразование формата в карту
private Map<String, String> mapStringToMap(String str,int enNum) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",",enNum);
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
Хорошо, теперь давайте отправим сообщения в разные очереди и посмотрим на результат:
Интерфейс вызова/sendDirectMessage и /sendFanoutMessage ,
Если вы хотите добавить другие очереди прослушивания, просто добавьте конфигурации таким образом (или вы можете разделить несколько потребительских проектов для мониторинга и обработки).
Хорошо, это конец этого руководства RabbitMq по интеграции Springboot.
Издатель: Лидер стека программистов полного стека, укажите источник для перепечатки: https://javaforall.cn/146274.html Исходная ссылка: https://javaforall.cn