На основе очереди путем добавления моделей производителя и потребителя можно сформировать простую очередь сообщений, используя очередь в качестве носителя. Данные, «переносимые» в очереди, называются сообщением.
Очередь сообщений может использоваться в памяти одного узла или в качестве промежуточного программного обеспечения для распределенного хранилища.
Из-за архитектурной организации проекта в качестве промежуточного программного обеспечения распределенного хранилища часто используются широко используемые в настоящее время очереди сообщений, такие как: RabbitMQ, RocketMQ, Kafka и т. д.
Очередь памяти по сравнению с Промежуточным программное обеспечение сообщений Часто бываютлегкий、Низкая задержка (не требуется сетевое соединение)、Простые в использовании функции,Но есть такжеДефекты, которые невозможно сохранить (что делать, если сообщение потеряно?) и невозможно расширить (что делать, если объем сообщения слишком велик?)
иПромежуточное программное обеспечение Функции сообщений больше похожи на: настойчивость、Высокая доступность、Расширение кластера、балансировка нагрузки、Развязка системы и т. д.Функции,Но это также увеличит количество каналов связи и повысит сложность системы.,Поэтому его часто используют в распределенных системах.
Функции
Асинхронная связь: MQ обеспечивает асинхронную связь без синхронного ожидания, что подходит для сценариев, требующих асинхронной связи.
Постоянство: сообщения будут сохраняться. После сохранения не нужно беспокоиться о потере асинхронных коммуникационных сообщений.
Сглаживание пиков и заполнение впадин: перед лицом внезапного трафика MQ эквивалентен буферу, предотвращающему получение серверной службой слишком большого количества запросов за короткий период времени, что может привести к сбою службы.
Разделение системы: слабая связь, производители (вызывающие) и потребители (вызываемые) могут обновляться/расширяться независимо друг от друга.
Кластер: подобно другим кластерам промежуточного программного обеспечения, он облегчает горизонтальное/вертикальное расширение и повышает пропускную способность/доступность системы.
Промежуточное программное обеспечение В дополнение к этим функциям сообщения также имеют свои уникальные функции и возможности. Эта статья начнется с RocketMQ, и с ней можно быстро начать работу. программное обеспечение сообщений Столбец
В этой колонке шаг за шагом анализируются архитектура, процесс, принципы, исходный код и т. д. промежуточного программного обеспечения сообщений, а затем анализируются преимущества и применимые сценарии различного промежуточного программного обеспечения сообщений.
С помощью диаграммы брокера двух основных узлов, представленной ниже, легко понять их взаимосвязь:
С помощью следующей диаграммы архитектуры вы можете легко понять взаимосвязь между кластерами NameServer, Broker, Product и Consumer:
В ходе этого процесса у Сяокая возник вопрос: почему Продукту и Потребителю необходимо взаимодействовать через NameServer для получения данных Брокера?
Брокер RocketMQ служит сервером, а NameServer — центром регистрации. Он меньше контактирует с написанием кода и больше контактирует с производителями и потребителями (клиентами).
После большого количества теоретических знаний мы знаем общий процесс MQ. Далее мы используем SpringBoot для написания кода для реализации клиентов Product и Consumer.
Native RocketMQ предоставляет множество API-интерфейсов производителей и потребителей, которые требуют try-catch и сложны в использовании. Разработчики уровня предприятия обычно инкапсулируют на их основе часто используемые API.
Платформа Spring Boot используется в качестве основы для очень быстрой интеграции RocketMQ, а также предоставляет соответствующий RocketMQTemplate для инкапсуляции собственного API и упрощения разработки.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
Разработка на уровне предприятия часто инкапсулирует собственные API, а ServerProduct — это пользовательский класс, который объединяется с собственным производителем по умолчанию DefaultMQProducer для инкапсуляции API для упрощения разработки.
в этом процессе,Обычно файлы конфигурации используются для настройки параметров, связанных с производителем, таких как: имя группы, адрес сервера имен, количество неудачных попыток отправки сообщений, тайм-аут отправки сообщений и т. д....
После установки параметров запустите продюсер producer.start()
public class ServerProduct {
private DefaultMQProducer producer;
public ServerProduct(String producerGroup) {
producer = new DefaultMQProducer(producerGroup);
init();
}
public ServerProduct() {
producer = new DefaultMQProducer("Default_Server_Producer_Group");
init();
}
private void init() {
//инициализация В основном устанавливается через значение файла конфигурации Наконец запустите продюсера
producer.setNamesrvAddr("127.0.0.1:9876");
//...
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
}
Запуск производителя в основном запускает запланированные задачи по синхронизации данных сервера имен и брокера, а также инициализирует некоторые компоненты, которые позже будут использоваться для взаимодействия с клиентской сетью, балансировки нагрузки и т. д.
Эти принципы будут подробно обсуждены позже при анализе исходного кода~
Затем инкапсулируйте API для отправки сообщений:
sendSyncMsg Первый параметр в API сообщения синхронизации — это тема (классификация первого уровня), вторая передача — тег (вторичная классификация), а третья передача — тело сообщения.
внутреннее совещаниеСоздайте сообщение с параметрами и отправьте брокеру, используя собственный API.
public SendResult sendSyncMsg(String topic, String tag, String jsonBody) {
Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
SendResult sendResult;
try {
sendResult = producer.send(message);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
throw new RuntimeException(e);
}
return sendResult;
}
@RequestMapping("/warn")
@RestController
@Slf4j
public class WarnController {
private static final String topic = "TopicTest";
@Autowired
private ServerProduct producer;
@GetMapping("/syncSend")
public SendResult syncSend() {
return producer.sendSyncMsg(topic, "tag", "sync hello world!");
}
}
После отправки сообщения оно будет сохранено брокеру, поэтому нам нужно использовать потребителя, чтобы получить сообщение и использовать его.
Во время разработки на уровне предприятия аннотации обычно используются для идентификации информации, на которую потребителю необходимо подписаться, а затем данные вводятся потребителю путем анализа аннотаций. Здесь мы напрямую используем аннотацию @RocketMQMessageListener, предоставленную Spring.
@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "warn_consumer_group")
public class WarnConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// Обработка сообщений
System.out.println("Received message: " + message);
}
}
использоватьDashboardСоздать вручнуюTopic
NameServer、Brokerиз Развертывание можно просмотретьОфициальная документация
До сих пор мы имели дело с производством и потреблением сообщений, но в процессе «жизни» сообщений могут возникать различные ситуации:
Эти ситуации будут решены одна за другой, от более мелких к более глубоким, в следующей статье. Ознакомьтесь с принципами реализации RocketMQ и ознакомьтесь с идеями проектирования, основанными на этих принципах, а затем ознакомьтесь с другим промежуточным программным обеспечением для сообщений~.
Промежуточное программное обеспечение сообщений обычно имеет пиковое бритье, асинхронное общение, Архитектурное в проект было внедрено разделение, высокая производительность, высокая доступность, расширение кластера, балансировка нагрузки и другие сопутствующие функции, а также Промежуточное. программное обеспечение сообщения также увеличат вызывающую ссылку и сложность системы.
RocketMQ состоит из NameServer, Broker, Product, Consumer и других кластеров.
Брокер, как сервер, отвечает за получение сообщений, эффективное сохранение сообщений и эффективный запрос сообщений при их использовании.
Определите тему для классификации сообщений. Чтобы улучшить возможности горизонтального расширения, под темой можно настроить очередь MessageQueue. Сообщения хранятся в очереди как носители данных, ожидая использования.
Чтобы обеспечить высокую доступность, одна и та же тема будет размещена у разных главных брокеров, чтобы избежать «все яйца Куна в одной корзине».
Кластер NameServer служит «центром регистрации». Узлы не имеют состояния и не взаимодействуют друг с другом. Они обновляют информацию о маршрутизации только одновременно с тактовым сигналом кластера Broker, когда информация о Broker будет передаваться, когда Продукт и Потребитель. регулярно общаемся.
Продукт является производителем сообщений. С помощью темы, идентификатора очереди и другой информации в брокере, полученной от сервера имен, он использует алгоритм балансировки нагрузки для поиска соответствующего брокера для связи.
Потребитель — это потребитель сообщений. Он получает очередь, за потребление которой он отвечает, на основе ребалансировки нагрузки, а затем получает сообщения через брокер для потребления.
Эта статья включена в рубрику Промежуточное программное обеспечение сообщений,заинтересованныйиз Студенты могутпродолжениесосредоточиться наох
Примечания и случаи этой статьи включены Gitee-CaiCaiJava、 Github-CaiCaiJava,Кроме того, есть более продвинутые знания, связанные с Java.,заинтересованныйиз Студенты могутstarredпродолжениесосредоточиться наох~
Если у вас есть какие-либо вопросы, вы можете обсудить их в области комментариев. Если вы считаете, что письмо Цай Цая хорошее, вы можете поставить лайк, подписаться на него и собрать его, чтобы поддержать его ~
Следуйте за Цай Цаем и делитесь дополнительной технической информацией, общедоступный аккаунт: внутренняя кухня Цай Цая.