RocketMQ (1): происхождение промежуточного программного обеспечения для сообщений, обзор общей архитектуры и основных компонентов.
RocketMQ (1): происхождение промежуточного программного обеспечения для сообщений, обзор общей архитектуры и основных компонентов.

RocketMQ (1): происхождение промежуточного программного обеспечения для сообщений, обзор общей архитектуры и основных компонентов.

На основе очереди путем добавления моделей производителя и потребителя можно сформировать простую очередь сообщений, используя очередь в качестве носителя. Данные, «переносимые» в очереди, называются сообщением.

Очередь сообщений может использоваться в памяти одного узла или в качестве промежуточного программного обеспечения для распределенного хранилища.

Из-за архитектурной организации проекта в качестве промежуточного программного обеспечения распределенного хранилища часто используются широко используемые в настоящее время очереди сообщений, такие как: RabbitMQ, RocketMQ, Kafka и т. д.

Очередь памяти по сравнению с Промежуточным программное обеспечение сообщений Часто бываютлегкий、Низкая задержка (не требуется сетевое соединение)、Простые в использовании функции,Но есть такжеДефекты, которые невозможно сохранить (что делать, если сообщение потеряно?) и невозможно расширить (что делать, если объем сообщения слишком велик?)

иПромежуточное программное обеспечение Функции сообщений больше похожи на: настойчивость、Высокая доступность、Расширение кластера、балансировка нагрузки、Развязка системы и т. д.Функции,Но это также увеличит количество каналов связи и повысит сложность системы.,Поэтому его часто используют в распределенных системах.

Функции

Асинхронная связь: MQ обеспечивает асинхронную связь без синхронного ожидания, что подходит для сценариев, требующих асинхронной связи.

Постоянство: сообщения будут сохраняться. После сохранения не нужно беспокоиться о потере асинхронных коммуникационных сообщений.

Сглаживание пиков и заполнение впадин: перед лицом внезапного трафика MQ эквивалентен буферу, предотвращающему получение серверной службой слишком большого количества запросов за короткий период времени, что может привести к сбою службы.

Разделение системы: слабая связь, производители (вызывающие) и потребители (вызываемые) могут обновляться/расширяться независимо друг от друга.

Кластер: подобно другим кластерам промежуточного программного обеспечения, он облегчает горизонтальное/вертикальное расширение и повышает пропускную способность/доступность системы.

Промежуточное программное обеспечение В дополнение к этим функциям сообщения также имеют свои уникальные функции и возможности. Эта статья начнется с RocketMQ, и с ней можно быстро начать работу. программное обеспечение сообщений Столбец

В этой колонке шаг за шагом анализируются архитектура, процесс, принципы, исходный код и т. д. промежуточного программного обеспечения сообщений, а затем анализируются преимущества и применимые сценарии различного промежуточного программного обеспечения сообщений.

Концепции архитектуры RocketMQ
  • Сообщение: Сообщение — это носитель транспорта в MQ, хранящий данные и другие метаданные, которые необходимо передать.
  • MessageQueue: очередь сообщений используется для хранения сообщений, и сообщения можно найти внутри по смещениям.
    • Он разделен на очереди чтения и записи для чтения при потреблении и записи при сохранении сообщений. Обычно количество очередей одинаковое.
    • Идентификатор очереди начинается с цифры 0 и постепенно увеличивается сам по себе. Например, если выделено 3 очереди чтения и записи, идентификаторы равны 0, 1 и 2 соответственно.
  • Тема: Тема (аналогично разделам в Kafka). Производителям необходимо отправлять сообщения в соответствующую тему, а потребителям необходимо подписаться на соответствующую тему для использования.
    • Действует как сообщение классификации и фильтрации сообщений.,Например, сообщения разного бизнеса (величины) распределяются по соответствующим темам (заказ, оплата, корзина, пользователь...).
    • В теме имеется несколько очередей (MQ) для хранения сообщений. Добавление очередей в тему может улучшить возможность горизонтальной записи сообщений.
    • Темы могут существовать на разных брокерах, чтобы обеспечить высокую доступность.
  • Тег: Классификация второго уровня по теме, фильтрация сообщений
  • Брокер: сервер, на котором хранятся метаданные, такие как сообщения, MessageQueue и Topic. Он используется для получения сообщений (производство процесса), сохранения сообщений и поиска сообщений (потребление процесса).
    • Полученное сообщение может быть получено и сохранено только главным узлом, а подчиненный узел используется только для синхронизации соответствующего сообщения главного узла.
    • Потребляющие сообщения могут передавать сообщения либо через главный узел, либо через подчиненный узел.

С помощью диаграммы брокера двух основных узлов, представленной ниже, легко понять их взаимосвязь:

  1. Брокер — это сервер, на котором хранятся сообщения, содержащие несколько тем.
  2. Тема — это тема, используемая для подписок на производство и потребление. Чтобы горизонтально расширить производительность записи, можно настроить несколько MessageQueue. Идентификатор MessageQueue начинается с 0 и увеличивается автоматически.
  3. Чтобы обеспечить высокую доступность, разные брокеры также будут иметь одну и ту же тему, но очереди будут разными, чтобы предотвратить недоступность службы в случае неожиданного выхода из строя брокера.,Как показано на рисунке TopicA,0、1 очередь у брокера В A очереди 2 и 3 находятся в Broker. Б в
  • NameServer: хранит информацию о маршрутизации брокера, аналогично центру регистрации.
    • Общайтесь с брокером, сохраняйте его данные, такие как Topic, MessageQueue и другие данные, и выполняйте контрольные сигналы, чтобы определить, находится ли брокер в автономном режиме.
    • Общайтесь с производителями и потребителями и передавайте метаданные брокера, чтобы независимо от того, идет ли речь о производстве или потреблении, на основе данных можно было найти соответствующий брокер, тему, MQ и т. д.
  • Продукт: Производитель, используемый для создания сообщений и отправки сообщений в очередь сообщений.
    • Группы производителей с одинаковой конфигурацией могут координировать свою работу.
    • Информация о маршрутизации, полученная через связь NameServer, выбирается в соответствии с алгоритмом балансировки нагрузки, а соответствующий идентификатор темы и очереди отправляется соответствующему брокеру для сохранения.
  • Потребитель: Потребитель, используемый для потребления сообщений, извлекает сообщения из очереди сообщений (длинный опрос) для потребления сообщений.
    • Смещение используется в очереди для подтверждения сообщения. При потреблении смещение также используется для определения места потребления.
    • Режим потребления разделен на широковещательный режим и режим кластера. Режим широковещательной передачи — это модель публикации и подписки, а режим кластера — двухточечное потребление.
    • Механизм длинного опроса используется для получения сообщений, чтобы компенсировать плохую производительность в режиме реального времени, но большое количество длинных соединений приведет к большим накладным расходам (механизм длинного опроса подробно описан ниже).
    • Для информации о маршрутизации, полученной посредством связи NameServer, потребитель выбирает соответствующую тему в соответствии с режимом потребления (широковещательная рассылка/кластер) и получает сообщение в соответствии с методом push/pull.
    • Группа координирует работу потребителей в одной группе для сбалансированного потребления сообщений. В режиме кластера очередь соответствует не более одному потребителю. Если количество потребителей превышает количество очередей, это будет недействительно.

С помощью следующей диаграммы архитектуры вы можете легко понять взаимосвязь между кластерами NameServer, Broker, Product и Consumer:

  1. Кластер NameServer запускается
  2. Кластер Broker настраивает адрес кластера NameServer и запускает его. Он отправляет контрольные сигналы каждому узлу NameServer, передавая информацию о маршрутизации (информацию о маршрутизации), такую ​​​​как Topic и MessageQueue, в своем собственном брокере.
  3. После того как клиенты продукта и потребителя настроят адрес кластера NameServer и запустятся, запланированная задача получит информацию о брокере (такие операции, как онлайн- и оффлайн-брокер, также будут обновлены немедленно).
  4. Продукт находит брокера для связи и отправляет сообщение на основе алгоритма балансировки нагрузки, темы, очереди сообщений и информации брокера.
  5. Брокер продолжает работу после получения сообщения о продукте
  6. Потребитель получает очередь, которую он хочет использовать, на основе алгоритма ребалансировки, а затем получает сообщение для потребления через информационную передачу брокера.

В ходе этого процесса у Сяокая возник вопрос: почему Продукту и Потребителю необходимо взаимодействовать через NameServer для получения данных Брокера?

  1. NameServer играет такую ​​роль в архитектуре, как центр регистрации, регистрация и обнаружение службы управления.
  2. Архитектурное разделение,Оставьте такие функции, как контрольные данные/данные о взаимодействии/статус решения, на NameServer (центр регистрации).
  3. В кластере брокеров, если нет NameServer, который должен выполнять контрольные сигналы и синхронизировать сводные данные между брокерами, нагрузка на полосу пропускания будет увеличиваться при наличии большого количества узлов. Кроме того, при выходе брокера из строя необходимо добавить механизм, определяющий, будет ли он отключен. он находится в автономном режиме, а продукт и потребитель. Будет много информации о брокере, которую необходимо настроить.(Требуется сетевое соединение)
  4. Узлы в кластере NameServer не сохраняют состояние и не взаимодействуют друг с другом, обеспечивая кластер высокой доступности.
Быстро начните работу с Spring Boot

Брокер RocketMQ служит сервером, а NameServer — центром регистрации. Он меньше контактирует с написанием кода и больше контактирует с производителями и потребителями (клиентами).

После большого количества теоретических знаний мы знаем общий процесс MQ. Далее мы используем SpringBoot для написания кода для реализации клиентов Product и Consumer.

Native RocketMQ предоставляет множество API-интерфейсов производителей и потребителей, которые требуют try-catch и сложны в использовании. Разработчики уровня предприятия обычно инкапсулируют на их основе часто используемые API.

Платформа Spring Boot используется в качестве основы для очень быстрой интеграции RocketMQ, а также предоставляет соответствующий RocketMQTemplate для инкапсуляции собственного API и упрощения разработки.

  1. Импортировать зависимости maven
Язык кода:xml
копировать
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>
  1. Инкапсулируйте собственный API продукта

Разработка на уровне предприятия часто инкапсулирует собственные API, а ServerProduct — это пользовательский класс, который объединяется с собственным производителем по умолчанию DefaultMQProducer для инкапсуляции API для упрощения разработки.

в этом процессе,Обычно файлы конфигурации используются для настройки параметров, связанных с производителем, таких как: имя группы, адрес сервера имен, количество неудачных попыток отправки сообщений, тайм-аут отправки сообщений и т. д....

После установки параметров запустите продюсер producer.start()

Язык кода:java
копировать
public class ServerProduct {

    private DefaultMQProducer producer;

    public ServerProduct(String producerGroup) {
        producer = new DefaultMQProducer(producerGroup);
        init();
    }

    public ServerProduct() {
        producer = new DefaultMQProducer("Default_Server_Producer_Group");
        init();
    }

    private void init() {
        //инициализация В основном устанавливается через значение файла конфигурации Наконец запустите продюсера
        producer.setNamesrvAddr("127.0.0.1:9876");
        //...

        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
    }
}

Запуск производителя в основном запускает запланированные задачи по синхронизации данных сервера имен и брокера, а также инициализирует некоторые компоненты, которые позже будут использоваться для взаимодействия с клиентской сетью, балансировки нагрузки и т. д.

Эти принципы будут подробно обсуждены позже при анализе исходного кода~

Затем инкапсулируйте API для отправки сообщений:

sendSyncMsg Первый параметр в API сообщения синхронизации — это тема (классификация первого уровня), вторая передача — тег (вторичная классификация), а третья передача — тело сообщения.

внутреннее совещаниеСоздайте сообщение с параметрами и отправьте брокеру, используя собственный API.

Язык кода:java
копировать
public SendResult sendSyncMsg(String topic, String tag, String jsonBody) {
    Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
    SendResult sendResult;
    try {
        sendResult = producer.send(message);
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        throw new RuntimeException(e);
    }
    return sendResult;
}
  1. Напишите класс контроллера, используйте производителя для вызова API и отправьте его брокеру.
Язык кода:java
копировать
@RequestMapping("/warn")
@RestController
@Slf4j
public class WarnController {

    private static final String topic = "TopicTest";

    @Autowired
    private ServerProduct producer;

    @GetMapping("/syncSend")
    public SendResult syncSend() {
        return producer.sendSyncMsg(topic, "tag", "sync hello world!");
    }
}
  1. Потребитель подписывается на тему

После отправки сообщения оно будет сохранено брокеру, поэтому нам нужно использовать потребителя, чтобы получить сообщение и использовать его.

Во время разработки на уровне предприятия аннотации обычно используются для идентификации информации, на которую потребителю необходимо подписаться, а затем данные вводятся потребителю путем анализа аннотаций. Здесь мы напрямую используем аннотацию @RocketMQMessageListener, предоставленную Spring.

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "warn_consumer_group")
public class WarnConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // Обработка сообщений
        System.out.println("Received message: " + message);
    }
}
  1. Создать тему вручную: TopicTest Не создавать тему автоматически Слишком большие проекты могут привести к забвению

использоватьDashboardСоздать вручнуюTopic

  1. Запустите NameServer и Broker, а затем запустите службу SpringBoot. Посетите /warn/syncSend, чтобы проверить отправку сообщений и их использование.

NameServer、Brokerиз Развертывание можно просмотретьОфициальная документация

До сих пор мы имели дело с производством и потреблением сообщений, но в процессе «жизни» сообщений могут возникать различные ситуации:

  1. Способ отправки сообщений: обычный (синхронный, асинхронный, односторонний)/последовательный/с задержкой/пакетный/транзакционное сообщение и т.д.
  2. Методы потребления сообщений: push/pull потребление, режим кластера/широковещания...
  3. Как сделать так, чтобы сообщения не потерялись?
  4. Как эффективно сохраняются сообщения?
  5. Как обеспечить идемпотентность потребления? Как решить проблему накопления и задержки сообщений?

Эти ситуации будут решены одна за другой, от более мелких к более глубоким, в следующей статье. Ознакомьтесь с принципами реализации RocketMQ и ознакомьтесь с идеями проектирования, основанными на этих принципах, а затем ознакомьтесь с другим промежуточным программным обеспечением для сообщений~.

Подвести итог

Промежуточное программное обеспечение сообщений обычно имеет пиковое бритье, асинхронное общение, Архитектурное в проект было внедрено разделение, высокая производительность, высокая доступность, расширение кластера, балансировка нагрузки и другие сопутствующие функции, а также Промежуточное. программное обеспечение сообщения также увеличат вызывающую ссылку и сложность системы.

RocketMQ состоит из NameServer, Broker, Product, Consumer и других кластеров.

Брокер, как сервер, отвечает за получение сообщений, эффективное сохранение сообщений и эффективный запрос сообщений при их использовании.

Определите тему для классификации сообщений. Чтобы улучшить возможности горизонтального расширения, под темой можно настроить очередь MessageQueue. Сообщения хранятся в очереди как носители данных, ожидая использования.

Чтобы обеспечить высокую доступность, одна и та же тема будет размещена у разных главных брокеров, чтобы избежать «все яйца Куна в одной корзине».

Кластер NameServer служит «центром регистрации». Узлы не имеют состояния и не взаимодействуют друг с другом. Они обновляют информацию о маршрутизации только одновременно с тактовым сигналом кластера Broker, когда информация о Broker будет передаваться, когда Продукт и Потребитель. регулярно общаемся.

Продукт является производителем сообщений. С помощью темы, идентификатора очереди и другой информации в брокере, полученной от сервера имен, он использует алгоритм балансировки нагрузки для поиска соответствующего брокера для связи.

Потребитель — это потребитель сообщений. Он получает очередь, за потребление которой он отвечает, на основе ребалансировки нагрузки, а затем получает сообщения через брокер для потребления.

Наконец (пожалуйста, поставьте лайк, соберите, подпишитесь, пожалуйста~)

Эта статья включена в рубрику Промежуточное программное обеспечение сообщений,заинтересованныйиз Студенты могутпродолжениесосредоточиться наох

Примечания и случаи этой статьи включены Gitee-CaiCaiJavaGithub-CaiCaiJava,Кроме того, есть более продвинутые знания, связанные с Java.,заинтересованныйиз Студенты могутstarredпродолжениесосредоточиться наох~

Если у вас есть какие-либо вопросы, вы можете обсудить их в области комментариев. Если вы считаете, что письмо Цай Цая хорошее, вы можете поставить лайк, подписаться на него и собрать его, чтобы поддержать его ~

Следуйте за Цай Цаем и делитесь дополнительной технической информацией, общедоступный аккаунт: внутренняя кухня Цай Цая.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose