Иллюстрированный обучающий сайт:https://xiaolincoding.com
Каждый находится в ежедневном развитии,Было ли оно обработаноПроблема большого количества невыполненных пакетных сообщенийШерстяная ткань?
Обычно это происходит из-заОшибки кода (например, неправильная обработка логики потребления)、Или скорость производства производителя превышает скорость потребления потребителя (например, большая рекламная кампания), панические покупки и другие действия, приводящие к резкому увеличению количества сообщений.,Или скорость обработки данных потребителями чрезвычайно низкая.),Это может привести к накоплению миллионов или даже десятков миллионов сообщений в производственной среде.
Итак, предположим, в Кафке накопился миллион сообщений, как это решить?
При возникновении проблемы с задержкой сообщений,Нам нужно сначала проверить,Есть ли ошибка?,Например, потребитель неправильно указал смещение.
Потребитель не совершил компенсацию после обработки сообщения, что привело к повторному потреблению или его стагнации. Это создает большое количество сообщений.
Дайте одинКонтрпример псевдокода:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
// Смещение не зафиксировано
}
}
После обработки сообщения правильно зафиксируйте смещение.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
//Отправляем смещение
consumer.commitSync();
}
Если это не ошибка, то это может быть задержка сообщений, вызванная низкой скоростью потребителя. Мы можем оптимизировать логику потребительского кода.
картина
Можно использоватьМногопоточность,Может сократить время обработки каждого сообщения (например, сократить ненужные вычисления),Это повышает скорость обработки сообщений.
Предположим, что потребитель имеетДвамашина,Оптимизации потребительского кода предшествует,Обработка 100 сообщений за 1 секунду. После оптимизации кода,Может обрабатывать 500 сообщений в секунду。
За час сообщений может быть обработано: 2*500*3600 = 3600 000
можно найти,Если накопилось более 3 миллионов сообщений,Обработка займет час. Если это производственная среда,НекоторыйЧувствительный или особенныйбизнес,Длительные задержки не допускаются.
Если дело срочное, мы можем Временное экстренное расширение, новая временная тема。
Например, исходная тема имеет только два раздела.,потому чтоПотребительская обработка требует много времени и других операций.,Это приводит к накоплению миллионов сообщений.,В это время требуется срочное и быстрое лечение.
В это время,потребительский кодекс,Мы можем внести некоторые коррективы,Просто больше не занимайтесь другими бизнес-операциями。СкорееСоздать новую временную тему,Переслать сообщение во временную тему,иpartition Раздел увеличен до исходного 10 раз
Затем наш исходный код обработки бизнес-логики потребителя помещается в новое временное сообщение для обработки.
После того, как накопившиеся данные быстро будут израсходованы, необходимо восстановить исходную развернутую архитектуру, удалить временных потребителей и снова использовать исходную потребительскую машину для потребления сообщений.
Что касается проблемы большого количества невыполненных онлайн-сообщений Kafka, я резюмировал следующие моменты: