Обеспечение порядка сообщений в Kafka: стратегия и настройка
Обеспечение порядка сообщений в Kafka: стратегия и настройка

1. Обзор

В этой статье мы рассмотрим проблемы и решения, связанные с упорядочиванием сообщений в Apache Kafka. В распределенных системах обработка сообщений в правильном порядке имеет решающее значение для поддержания целостности и согласованности данных. Хотя Kafka предоставляет механизмы для поддержания порядка сообщений, реализация этого в распределенной среде имеет свои сложности.

2. Порядок внутри раздела и его проблемы

Kafka поддерживает порядок внутри одного раздела, присваивая каждому сообщению уникальное смещение. Это гарантирует последовательное добавление сообщений внутри раздела. Однако когда мы масштабируем и используем несколько разделов, поддержание глобального порядка становится сложным. Разные разделы получают сообщения с разной скоростью, что усложняет строгое упорядочение между разделами.

2.1 Выбор времени производителей и потребителей

Давайте поговорим о том, как Kafka обрабатывает порядок сообщений. Существуют некоторые различия между порядком, в котором производитель отправляет сообщения, и порядком, в котором их получает потребитель. Придерживаясь одного раздела, мы можем обрабатывать сообщения в том порядке, в котором они поступают к брокеру. Однако этот порядок может не совпадать с тем порядком, в котором мы их изначально отправили. Эта путаница может возникнуть из-за задержек в сети или из-за повторной отправки сообщений. Для обеспечения согласованности мы можем реализовать производителей с подтверждениями и повторными попытками. Таким образом, мы гарантируем, что сообщения не только поступят в Kafka, но и придут в правильном порядке.

2.2 Проблемы с несколькими разделами

Такое распределение по разделам, хотя и полезно для масштабируемости и отказоустойчивости, но усложняет реализацию глобального порядка сообщений. Например, мы отправляем два сообщения, M1 и M2, последовательно. Kafka получает их так же, как мы их отправили, но помещает в разные разделы. Проблема здесь в том, что то, что M1 отправляется первым, не означает, что оно будет обработано раньше M2. Это может быть затруднительно в ситуациях, когда порядок обработки имеет решающее значение, например при финансовых транзакциях.

2.3 Последовательность сообщений одного раздела

Мы создали тему с именем «single_partition_topic», имеющую один раздел, и тему с именем «multi_partition_topic», имеющую 5 разделов. Вот пример темы с одним разделом, которому производитель отправляет сообщения:

UserEvent является реализованным Comparable Интерфейс POJO класс, помогает прессовать globalSequenceNumber(внешний серийный номер)Сортировка классов сообщений。потому чтопродюсертолькосуществоватьотправлять POJO Объект сообщения, мы реализовали собственный Jackson Сериализатор и десериализатор.

Раздел 0 получает все пользовательские события, причем идентификаторы событий отображаются в следующем порядке:

существовать Kafka , каждая группа потребителей действует как независимая организация. Если два потребителя принадлежат к разным группам потребителей, они оба получат все сообщения по теме. Это потому, что Кафка рассматривает каждую группу потребителей как отдельного подписчика.

Если два потребителя принадлежат к одной группе потребителей и подписаны на тему с несколькими разделами,Кафка обеспечит Каждый потребитель читает данные из уникального набора разделов.。Это сделано для того, чтобы обеспечить одновременную обработку сообщений.。

Кафка гарантирует, что существование находится в пределах потребительской группы.,Никакие два потребителя не читают одно и то же сообщение,Поэтому каждое сообщение существует обрабатывается только один раз для каждой группы.

Следующий код является примером того, как один и тот же потребитель получает сообщения из той же темы:

существуют В этом случае,Результаты, которые мы получаем, показывают, что потребитель потребляет сообщения в том же порядке.,Вот последовательные идентификаторы событий из выходных данных:

2.4 Последовательность многораздельных сообщений

Для тем с несколькими разделами конфигурация потребителей и производителей одинакова. Единственная разница заключается в теме и разделе, в который отправляется сообщение. Производитель отправляет сообщения в тему «multi_partition_topic»:

Потребитель потребляет сообщения одной и той же темы:

В выходных данных производителя перечислены идентификаторы событий и соответствующие им разделы следующим образом:

Для потребителей выходные данные покажут, что потребители потребляют сообщения в разном порядке. Идентификаторы событий в выходных данных следующие:

3.1 Использование одного раздела

мы можемсуществовать Kafka Используйте один раздел в 'single_partition_topic' Как показано в примере, это обеспечивает порядок сообщений. Однако у этого подхода есть свои недостатки:

  • Предел пропускной способности: представьте, что мы существуем в оживленной пиццерии. Если у нас есть только один шеф-повар (продюсер) и один официант (потребитель) (Раздел), работающие за одним столом, они существуют и могут обслуживать только такое количество пиццы, прежде чем все начнет накапливаться. существовать Kafka В мире, когда мы имеем дело с большим количеством сообщений, придерживаться одного Раздела похоже на сценарий с одной таблицей. В сценариях существования с большим объемом одна-единственная секция становится узким местом, а скорость обработки сообщений ограничена, поскольку только один производитель и один потребитель могут работать в одном существе одновременно.
  • Уменьшите параллелизм: в приведенном выше примере, если у нас есть несколько поваров (продюсер) и официантов (потребителей), работающих за несколькими столами (Раздел), то количество выполненных заказов увеличится. Кафка Преимуществасуществоватьчерез несколько Раздел Параллельная обработка。только один Раздел,Это преимущество теряется,что приводит к последовательной обработке,и еще больше ограничивает поток сообщений.

По сути,Один раздел гарантирует упорядочение за счет снижения пропускной способности.

3.2 Внешняя сортировка и буферизация временного окна

существуют в этом методе,производитель помечает каждое сообщение глобальным порядковым номером. Несколько экземпляров потребителей одновременно потребляют сообщения из разных разделов.,и используйте эти порядковые номера, чтобы изменить порядок сообщений,для обеспечения глобального порядка.

существуют реальные сценарии с несколькими продюсерами,Мы будем управлять глобальной последовательностью через общий ресурс, доступный всем процессам-производителям, например, последовательность базы данных или распределенный счетчик.。Это гарантирует, что серийный номерсуществовать Единственное среди всех сообщенийиупорядоченный,Независимо от того, какой производитель их присылает:

существовать Потребительская сторона,Группируем сообщения по временным окнам,Затем обработайте их по порядку. Мы существуем, сообщения, поступающие в течение определенного периода времени, объединяем их в группы.,По истечении срока действия окна,Обрабатываем партию. Это обеспечивает упорядоченную обработку в течение этого периода времени.,Даже если у них разное время прибытия в пределах окна. Потребитель буферизует сообщения на основе порядковых номеров и меняет их порядок перед обработкой. Нам необходимо убедиться, что сообщения обрабатываются в правильном порядке.,с этой целью,У потребителей должен быть буферный период,существуют Опросите сообщение несколько раз перед обработкой буферизованного сообщения.,И этот буферный период достаточно длинный,Чтобы устранить потенциальные проблемы с сортировкой сообщений:

Идентификатор каждого события существует в выходных данных вместе с соответствующим ему разделом, как показано ниже:

Потребительские выходные данные с глобальным порядковым номером и идентификатором события:

3.3 Рекомендации по внешней сортировке и буферизации

существуют в этом методе,Каждый экземпляр потребителя буферизует сообщения.,и обрабатывать их по порядку в зависимости от их серийных номеров. Однако,Есть некоторые соображения:

  • Размер буфера. Размер буфера можно увеличить в зависимости от количества входящих сообщений. существования отдает приоритет реализациям, которые строго сортируются по порядковому номеру,Мы можем увидеть значительный рост буферов,Особенно если есть задержка доставки сообщения. Например,Если мы обрабатываем 100 элементов в минуту, мы будем обрабатывать 200 элементов в минуту.,Буфер неожиданно вырастет. поэтому,Мы должны эффективно управлять размером буфера,И подготовьте стратегию на случай, если существование превысит ожидаемые пределы.
  • Задержка: когда мы буферизуем сообщения,Фактически мы заставляем их ждать некоторое время перед обработкой (вводя задержку). с одной стороны,С другой стороны, это помогает нам оставаться организованными;,Это замедляет весь процесс. Главное — найти правильный баланс между поддержанием порядка и минимизацией задержек.
  • Сбой: в случае сбоя потребителя мы можем потерять буферизованные сообщения. Чтобы предотвратить это, нам может потребоваться периодически сохранять состояние буферизации.
  • Поздние сообщения: Сообщения, поступающие после окна обработки существования, будут не по порядку. В зависимости от варианта использования нам могут потребоваться политики для обработки или удаления таких сообщений.
  • Управление состоянием. Если обработка включает в себя операции с отслеживанием состояния, нам потребуются механизмы для управления и сохранения состояния в разных окнах.
  • Использование ресурсов: существующие буферы требуют памяти для хранения большого количества сообщений. Нам нужно убедиться, что у нас достаточно ресурсов, чтобы справиться с этим.,Особенно, если сообщение существует и находится в буфере длительное время.

3.4 Идемпотентные производители

Kafka Функциональность идемпотентного производителя предназначена для доставки сообщений только один раз, что предотвращает любое дублирование. Это существованиепродюсер может иметь решающее значение в ситуациях, когда повторная попытка отправки сообщений может произойти из-за сетевых ошибок или других временных сбоев. Основная цель идемпотентности — предотвратить дублирование сообщений, но она косвенно влияет на порядок сообщений. Кафка Используйте две вещи для достижения идемпотентности: производители ID (PID) и служит серийным номером идемпотентного ключа, уникального в контексте конкретного Раздела.

  • Серийный номер: Кафка Каждому сообщению, отправленному продюсером, присваивается порядковый номер. Эти порядковые номера уникальны в каждом разделе, что гарантирует, что производитель отправляет сообщения в определенном порядке существования. Kafka При приеме существование в пределах одного Раздела пишется в том же порядке. Серийный номер гарантирует заказ в пределах одного раздела. Однако, когда существование создает сообщения для нескольких Разделов, не существует гарантии глобального порядка между Разделами. Например, если продюсер отправит сообщение M1、M2 и M3 Отправлять в разделы отдельно P1、P2 и P3,Тогда каждое сообщение существует и получает уникальный порядковый номер в своем Разделе. Однако,Это не гарантирует относительный порядок потребления этих Разделов.
  • Идентификатор производителя (PID). Когда идемпотентность включена, брокер назначает каждому производителю уникальный идентификатор производителя (PID). Этот PID в сочетании с порядковым номером позволяет Kafka идентифицировать и отбрасывать любые повторяющиеся сообщения из-за повторных попыток производителя.

Kafka Порядок сообщений гарантируется записью сообщений в раздел в производственном порядке благодаря порядковым номерам и PID и идемпотентные функции для предотвращения дублирования. Чтобы включить идемпотентного производителя, нам нужна конфигурация существующегопродюсера. “enable.idempotence” Свойство настроено на true:

4. Ключевые конфигурации производителей и потребителей

Существует несколько ключевых конфигураций производителя и потребителя Kafka, которые могут повлиять на порядок сообщений и пропускную способность.

4.1#### 4.1 Конфигурация производителя
  • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION: Если мы отправим большое количество сообщений, Кафка Этот параметр помогает определить, сколько сообщений мы можем отправить, не дожидаясь подтверждения о прочтении. Если мы установим это значение выше, чем 1 Без включения идемпотентности мы можем нарушить порядок сообщений, если нам понадобится их повторно отправить. Однако если мы включим идемпотентность, Кафка Даже если мы отправляем много сообщений одновременно, порядок сообщений сохраняется. Если нам нужен очень строгий порядок, например, чтобы гарантировать, что каждое существующее сообщение будет прочитано до отправки следующего сообщения, мы должны установить это значение равным 1. Если бы мы хотели поставить скорость выше идеального порядка, мы могли бы установить 5, но это может привести к проблемам с упорядочиванием.
  • BATCH_SIZE_CONFIG и LINGER_MS_CONFIG: Kafka Управляет размером пакета по умолчанию (в байтах) с целью группировки записей из одного раздела в меньшее количество запросов для повышения производительности. Если мы установим этот лимит слишком низким, мы отправим много маленьких групп, что может замедлить нашу работу. Но если мы установим слишком высокое значение, это может оказаться не лучшим использованием памяти. Кафка Вы можете подождать некоторое время перед отправкой группы, если она еще не заполнена. Это время ожидания определяется выражением LINGER_MS_CONFIG контроль. Если больше сообщений поступает достаточно быстро, чтобы заполнить установленный нами лимит, они отправляются немедленно, в противном случае Kafka Не буду больше ждать - это будетсуществоватькогда время вышлоотправлятьвсе, что у нас есть。это каксуществоватьскоростьибаланс между эффективностью,Убедитесь, что мы отправляем достаточно сообщений одновременно,без ненужных задержек.
Язык кода:properties
копировать
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
4.2 Конфигурация потребителя
  • MAX_POLL_RECORDS_CONFIG: это Kafka Ограничение количества записей, которые потребитель может получить за один запрос данных. Если мы установим это число большим, мы сможем поглощать большие объемы данных одновременно, увеличивая нашу пропускную способность. но есть загвоздка - Чем больше мы приобретаем, тем труднее может быть поддерживать все в порядке. Итак, нам нужно найти ту золотую середину, где мы эффективны, но не перегружены.
  • FETCH_MIN_BYTES_CONFIG: Если мы установим это число очень большим, Кафка будет ждать, пока у него не будет достаточно данных, чтобы удовлетворить наше минимальное количество байтов, прежде чем отправлять его. Это может означать меньшее количество поездок (или приобретений), что хорошо для эффективности. Но если мы спешим и хотим быстро получить данные, мы можем установить это число меньше, чтобы Kafka будет быстрееотправлятьвсе, что у него есть。Например,Если наше потребительское приложение требует ресурсов или требует строгого порядка сообщений,Особенно в случае многопоточности,Меньшие партии могут быть более выгодными.
  • FETCH_MAX_WAIT_MS_CONFIG: Это определит, чего ждут наши потребители. Kafka Соберите достаточно данных, чтобы удовлетворить наши FETCH_MIN_BYTES_CONFIG время。Если мы установим это время очень высоким,Наши потребители готовы ждать дольше,Можно получить больше данных одновременно. Но если мы будем действовать в спешке,Мы ставим его ниже,Таким образом, наши потребители будут получать данные быстрее, даже если у них не так много данных. Это компромисс между ожиданием большей прибыли и быстрыми действиями.
Язык кода:properties
копировать
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

5. Заключение

существоватьв этой статье,Мы углубились в глубину Kafka Сложность упорядочивания сообщений в . Мы изучаем проблемы и предлагаем решения. Будь то один раздел, внешняя сортировка и буферизация временных окон или идемпотентные производители, Kafka Предоставляет индивидуальные решения для удовлетворения потребностей в сортировке сообщений.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose