Первоначально Kafka была разработана LinkedIn с использованием языка Scala как многораздельная и многокопийная распределенная система обмена сообщениями, основанная на координации ZooKeeper. Теперь она передана в дар Apache Foundation.
В настоящее время Kafka позиционируется как платформа распределенной потоковой обработки. Она широко используется благодаря своей высокой пропускной способности, устойчивости, горизонтальной масштабируемости, поддержке потоковой обработки данных и другим функциям. В основном она написана на Scala и Java.
Это распределенная система обмена сообщениями с высокой пропускной способностью, которая может обрабатывать данные потока событий. С помощью Kafka вы можете легко распространить сообщение, которое хотите опубликовать, любому получателю, желающему подписаться на это сообщение. Восходящему производителю нужно только ввести сообщение в Kafka и указать тему. Нижестоящему получателю достаточно подписаться на тему, чтобы получать восходящее сообщение с низкой задержкой и высокой пропускной способностью. Kafka также поддерживает ту же тему, которая используется несколькими нижестоящими потребителями. в то же время потребление и ход обработки данных между разными потребителями не мешают друг другу.
В настоящее время все больше и больше систем распределенной обработки с открытым исходным кодом, таких как Cloudera, Storm, Spark, Flink и т. д., поддерживают интеграцию с Kafka. Причина, по которой Kafka набирает все большую популярность, неотделима от трех основных ролей, которые она «играет». :
Очереди сообщений обычно в основном занимаются: асинхронной обработкой, развязкой сервисов и управлением потоком. Поэтому Kafka, как разновидность очереди сообщений, также решает эти проблемы.
Producer:продюсер сообщений,То есть отправить сообщениеизс другой стороны。Производитель несет ответственностьсоздаватьинформация,Затем доставьте его середину Кафки;
Consumer:информацияпотребитель,也就да接收информацияизс другой стороны。потребительсоединятьприезжать Kafka и получать сообщения, а затем выполнять соответствующую обработку бизнес-логики;
Consumer Group (CG):потребитель组,Зависит отмногоиндивидуальный consumer композиция. Каждый потребитель в группе потребителей отвечает за потребление данных из разных разделов. Раздел может использоваться только потребителями одной группы потребителей и не влияет друг на друга. Все потребители принадлежат к определенной группе потребителей, то есть группа потребителей является логическим абонентом.
Broker:узел сервисного агента。верно В Kafka С точки зрения брокера можно просто рассматривать как независимую Kafka сервисный узел или Kafka Экземпляр службы. В большинстве случаев также возможно Broker рассматривается как один Kafka сервер, при условии, что на этом сервере развернут только один Kafka Пример. один или несколько Broker сформировал Kafka кластер. Вообще говоря, мы больше привыкли использовать строчные буквы. broker для представления узла агента службы.
Controller:Встреча в кластереиметьодининдивидуальный或者многоиндивидуальный брокер, один из которых broker будет избран контролёром (Кафка Controller), который отвечает за управление состоянием всех разделов и реплик во всем кластере.
В Kafka есть две особенно важные концепции — тема и раздел.
Topic:
Его можно понимать как очередь. Производитель и потребитель находятся на обоих концах очереди. Один выводит данные, а другой потребляет. Они оба ориентированы на определенную тему;
Partition:
Чтобы добиться масштабируемости, тема с очень большим объемом данных может быть распределена между несколькими брокерами (т. е. серверами). Тема может быть разделена на несколько разделов, и каждый раздел представляет собой упорядоченную очередь, тогда параллелизм темы определяется; в основном равен количеству разделов.
Сообщения в Kafka классифицируются по темам. Производитель отвечает за отправку сообщений в определенную тему (каждое сообщение, отправляемое в кластер Kafka, должно указывать тему), а потребитель отвечает за подписку на тему и ее использование.
Тема — это логическая концепция, которую можно разделить на несколько разделов. Раздел принадлежит только одной теме. Во многих случаях раздел также называется разделом темы. Разные разделы одной темы содержат разные сообщения. Этот раздел можно рассматривать как добавляемый файл журнала на уровне хранилища. Когда сообщения добавляются в файл журнала раздела, им будет присвоено определенное смещение.
Смещение — это уникальный идентификатор сообщения в разделе. Kafka использует его для обеспечения порядка сообщения внутри раздела. Другими словами, Kafka гарантирует порядок разделов, а не их. упорядочивание тем.
Как показано на рисунке выше:в темеиметь 4 разделах сообщения последовательно добавляются в конец файла журнала каждого раздела. Кафка Разделы могут быть распределены по разным серверам (брокерам), то есть тема может охватывать несколько брокер, чтобы обеспечить лучшую производительность, чем одиночный broker Более мощная производительность.
Прежде чем каждое сообщение будет отправлено брокеру, он выберет, какой конкретный раздел хранить в соответствии с правилами раздела. Если правила разделения установлены соответствующим образом, все сообщения могут быть равномерно распределены по разным разделам. Если тема соответствует только одному файлу, то ввод-вывод машины, на которой находится файл, станет узким местом производительности темы, и секционирование решает эту проблему. При создании темы вы можете установить количество разделов, указав параметры. Конечно, вы также можете изменить количество разделов после создания темы. Горизонтального расширения можно добиться, увеличив количество разделов.
Replica:
Kafka представляет механизм нескольких реплик для разделов, который может улучшить возможности аварийного восстановления за счет увеличения количества реплик.
Разные копии одного и того же раздела хранят одно и то же сообщение (при этом копии не совсем одинаковые). Связь между копиями — «один ведущий и несколько слейвов», при которой за обработку прочитанного и отвечает ведущий экземпляр. запросы на запись и только ведомая копия. Отвечает за синхронизацию сообщений с ведущей репликой. Реплики находятся в разных брокерах. При сбое ведущей реплики из подчиненных реплик переизбирается новая ведущая реплика для предоставления внешних услуг. Kafka реализует автоматическое переключение при сбое с помощью механизма нескольких реплик. При сбое брокера в кластере Kafka служба все равно может быть гарантирована.
Как показано на рисунке выше:Kafka Есть 4 индивидуальный broker,определенныйиндивидуальныйв темеиметь 3 индивидуальный раздел, а также фактор репликации (то есть количество индивидуальных реплик). 3. Таким образом, каждый раздел будет иметь 1 индивидуальный leader скопировать и 2 индивидуальный follower копия. Производители и потребители взаимодействуют только с leader реплики взаимодействуют, при этом follower Во многих случаях реплика отвечает только за синхронизацию сообщений. follower Сообщения в репликах относительно leader Для копий будет определенная задержка.
Потребитель Kafka также имеет определенные возможности аварийного восстановления. Потребитель использует режим извлечения для получения сообщений с сервера и сохраняет конкретное место потребления. Когда потребитель возвращается в Интернет после отключения, он может повторно получить необходимые сообщения для потребления на основе ранее сохраненного места потребления, чтобы они были доступны. не приведет к потере сообщения.
Более:想了解更много关В:большие данные Эксплуатация и обслуживание, связанное с подготовкой системного окружения, Установка базовой среды、Развертывание кластера以及отвечать用组件安装ждать Многоборьеизтехнологияизвопрос。нравиться:Вы можете обратиться ко мне, если у вас есть вопросы по построению среды/развертыванию кластера, расширению памяти/устранению неполадок, миграции данных и т. д.
Путь к инструменту командной строки Kafka: xxx/kafka/bin/Вниз
Может управлять темой, включая создание, удаление, расширение разделов, запрос сведений о теме, просмотр списка тем и т. д.
Командный инструмент: kafka-topics.sh
# создавать Topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
# Topic Расширение раздела
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 4
# удалить Topic:
kafka-topics.sh --delete --zookeeper localhost:2181 localhost:9092 --topic test
#запрос Topic Подробности
[DEV (v.v) sa_cluster@hybrid03 bin]$ ./kafka-topics.sh --topic event_topic --zookeeper localhost:2181 --describe
Topic:event_topic PartitionCount:10 ReplicationFactor:2 Configs:compression.type=gzip
Topic: event_topic Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: event_topic Partition: 1 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: event_topic Partition: 2 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: event_topic Partition: 3 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: event_topic Partition: 4 Leader: 1003 Replicas: 1003,1001 Isr: 1003,1001
Topic: event_topic Partition: 5 Leader: 1002 Replicas: 1002,1003 Isr: 1002,1003
Topic: event_topic Partition: 6 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
Topic: event_topic Partition: 7 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: event_topic Partition: 8 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: event_topic Partition: 9 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
#перечислить все Topic
kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
Балансировка данных после добавления и удаления узлов
После добавления узла данных, хотя брокер был запущен на новом узле, Kafka не будет автоматически балансировать данные, и его необходимо выполнить вручную.
командный инструмент:kafka-reassign-partitions.sh
Более:想了解更много关В:большие данные Эксплуатация и обслуживание, связанное с подготовкой системного окружения, Установка базовой среды、Развертывание кластера以及отвечать用组件安装ждать Многоборьеизтехнологияизвопрос。нравиться:Вы можете обратиться ко мне, если у вас есть вопросы по построению среды/развертыванию кластера, расширению памяти/устранению неполадок, миграции данных и т. д.
Напишите файл конфигурации move-json-file.json и сообщите Kafka, какие темы вы хотите перераспределить:
{
"topics": [{
"topic": "event_topic"
},
{
"topic": "profile_topic"
},
{
"topic": "item_topic"
}
],
"version": 1
}
Выполните команду для генерации информации о распределении:Обратите внимание наизда,В настоящее время перемещение раздела еще не началось,Он просто сообщает вам, когда было выделено и предложено. Сохраните перед назначением,На тот случай, если вы захотите откатить его обратно.
# под --broker-list параметр верноотвечатьизда brokerid
[DEV (v.v) cluster@hybrid03 bin]$ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ~/mv.json --broker-list "1001,1002" --generate
Current partition replica assignment #когда Информация о предварительном задании
{"version":1,"partitions":[{"topic":"event_topic","partition":2,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1003],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1003,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":7,"replicas":[1003,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1003,1002],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration #После размещения информации
{"version":1,"partitions":[{"topic":"event_topic","partition":7,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":1,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"item_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":4,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":9,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":6,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":3,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"event_topic","partition":8,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":0,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":5,"replicas":[1002,1001],"log_dirs":["any","any"]},{"topic":"profile_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]},{"topic":"event_topic","partition":2,"replicas":[1001,1002],"log_dirs":["any","any"]}]}
将上面得приезжать期望из Сохранение файла режима переназначениясуществоватьодининдивидуальный json Внутри файла: reassignment-json-file.json, а затем передайте параметры —execute Выполнить задание:
Эту команду также можно использовать в следующих сценариях использования:
Просмотр статуса потребления группы
# group: обозначениеgroup Идентификационное имя
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
# Пример:
# TOPIC: группавернноследует из темы
# РАЗДЕЛ: номер раздела, начиная с 0, 0-5 означает наличие 6 отдельных разделов.
# CURRENT-OFFSET: этот потребитель израсходовал из смещения до того, как
# LOG-END-OFFSET: подтверждение фиксации производителя существует из смещения в этом разделе раздела.
# ЛАГ: Разницу между двумя индивидуальными смещениями часто называют отставанием. Если это значение слишком велико, это ненормально.
# ХОСТ: IP-адрес существующего сервера потребителя.
# CLIENT-ID: информация для потребителя
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
2.удалитьgroup
удалить группу
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test-group
Сброс смещения потребителей
Самая ранняя стратегия: отрегулировать смещение до самого раннего смещения до того, когда
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Последняя стратегия: отрегулировать смещение до последнего смещения до того, когда
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Текущая стратегия: скорректировать смещение до последнего представленного смещения до того, как
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Стратегия Specified-Offset: отрегулируйте смещение в соответствии со смещением отображаемого изображения.
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Стратегия Shift-By-N: отрегулируйте смещение до смещения + N до (N может быть отрицательным значением)
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --topic test --reset-offsets --shift-by <offset_N> --execute
Стратегия DateTime: (отрегулируйте смещение до положения, превышающего минимальное смещение в данный момент времени)
Время нужно сократить на 8.
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --topic test --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
Стратегия продолжительности: отрегулируйте смещение до расстояния, обозначающего интервал смещения до того, когда, а затем отрегулируйте смещение до заданного интервала времени от смещения до того, когда. Конкретный формат: PnDTnHnMnS。
в буквах P Начало, затем Зависит от 4 Частично состоит из D、H、M и S означает дни, часы, минуты и секунды соответственно.
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
# настраивать topic Срок годности(单位 миллисекунда)
### 3600000 миллисекунда = 1 час
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics --add-config retention.ms=3600000
# Проверять topic Конфигурация
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --describe --entity-name topic-devops-elk-log-hechuan-huanbao --entity-type topics
# подключиться к тестовой теме, а затем создать сообщение, введя +
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --producer-property
>
# --from-beginning: обозначение Начать получение сообщений, в противном случае он начнет получать сообщения из последнего местоположения.
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property
# продюсер тестов
# Отправлено в тему 1 Десятки миллионов сообщений, размер каждого сообщения 1KB
# он распечатает продюсера тесты Пропускная способность (МБ/с), задержка отправки сообщения и задержка по различным квантилям
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
# Тестирование потребительской эффективности
$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012
kafka Единицей чтения и письма является раздел, такой индивидуальный topic 拆分为многоиндивидуальный partition Может улучшить пропускную способность. Но предпосылка здесь другая. partition Должны располагаться на разных дисках (может существовать на одной и той же отдельной машине). Если более индивидуальный partition 位В同одининдивидуальныйдиск,Это означает, что иметь индивидуальный процесс можно только на одном индивидуальном диске из отдельных файлов одновременно для чтения и записи.,Это приводит к тому, что операционная система часто планирует чтение и запись на диск.,То есть нарушается непрерывность чтения и записи диска.
проиллюстрировать:нравиться果你想了解更много关В:большие данные Эксплуатация и обслуживание, связанное с подготовкой системного окружения, Установка базовой среды、Развертывание кластера以及отвечать用组件安装ждать Многоборьеизтехнологияизвопрос。例нравиться:Стройте из окружающей среды/Развертывание кластера,Расширение памяти/устранение неполадок,Ожидание миграции данных поможет вам легко справиться со сложностями управления данными. Вы можете связаться со мной: 15928721005.
В качестве сборщика мусора вместо CMS рекомендуется использовать последнюю версию G1. Минимальная рекомендуемая версия Java — JDK 1.7u51.
Преимущества G1 по сравнению с CMS:
Чтобы значительно повысить производительность записи производителя, файлы необходимо регулярно записывать пакетами.
иметь 2 индивидуальныйпараметр Может Конфигурация:
log.flush.interval.messages = 100000
:
в любое время producer писать 100000 Когда есть фрагмент данных, они сбрасываются на диск.log.flush.interval.ms=1000
:
каждый 1 Секунды, просто обновите диск один разКогда на сервер Kafka записывается большой объем сообщений, создается множество файлов данных, которые занимают большой объем дискового пространства. Если их не очистить вовремя, Kafka может хранить их в течение 7 дней. по умолчанию.