В процессе отправки сообщения задействованы два потока – основной Темы и Sender нить。существовать main нитьсерединасоздавать Понятнодек RecordAccumulator。main Поток отправляет сообщение RecordAccumulator,Sender Ветка продолжает идти от RecordAccumulator Извлекайте сообщения и отправляйте их Kafka Broker。
-Имя параметра | -описывать |
---|---|
bootstrap.servers | Список адресов брокеров, необходимых производителям для подключения к кластеру. Например, Hadoop102:9092, Hadoop103:9092, Hadoop104:9092 можно задать один или несколько, разделив их запятыми. Обратите внимание здесь и |
key.serializer ? value.serializer | Указывает тип сериализации ключа и значение отправленного сообщения. Обязательно напишите полное имя класса. |
buffer.memory | Общий размер буфера RecordAccumulator, по умолчанию 32 м. |
batch.size | Максимальный размер пакета данных в буфере, по умолчанию 16 КБ. Соответствующее увеличение этого значения может улучшить пропускную способность, но если это значение установлено слишком большим, задержка передачи данных увеличится. |
linger.ms | Если данные не достигают размера пакета, отправитель ожидает linger.time перед отправкой данных. Единица измерения — мс. Значение по умолчанию — 0 мс, что означает отсутствие задержки. Для производственных сред рекомендуется, чтобы это значение составляло от 5 до 100 мс. |
acks | 0: данным, отправленным производителем, не нужно ждать отправки данных на диск для ответа. 1: Данные отправлены производителем, лидер отвечает после получения данных. -1 (все): данные, отправленные производителем, Leader+ и всеми узлами в очереди isr, отвечают после получения данных. Значение по умолчанию — -1, -1, и все они эквивалентны. |
max.in.flight.requests.per.connection | Максимальное количество раз, когда подтверждение не возвращается. Значение по умолчанию — 5. Чтобы включить идемпотентность, убедитесь, что значение представляет собой число от 1 до 5. |
retries | В случае ошибки при отправке сообщения система отправит сообщение повторно. retries представляет количество повторных попыток. По умолчанию используется максимальное значение int: 2147483647. Если установлена повторная попытка и вы хотите обеспечить порядок сообщений, вам необходимо установить MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1. В противном случае при повторной попытке этого неудачного сообщения другие сообщения могут быть отправлены успешно. |
retry.backoff.ms | Интервал времени между двумя повторными попытками, по умолчанию — 100 мс. |
enable.idempotence | Включить ли идемпотентность, по умолчанию — true, идемпотентность включена. |
compression.type | Метод сжатия всех данных, отправленных производителем. По умолчанию установлено значение none, что означает отсутствие сжатия. Поддерживаемые типы сжатия: нет, gzip, snappy, lz4 и zstd. |
1) Требования: Создать продюсер Кафки, отправлено асинхронно в Kafka Broker
Асинхронный процесс отправки
2) Написание кода
(1) Создать проект Кафка
(2) Зависимости импорта
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
(3) Создайте имя пакета: com.atguigu.kafka.producer.
(4) Напишите код API без функции обратного вызова.
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) throws
InterruptedException {
// 1. создавать kafka Объект конфигурации производителя
Properties properties = new Properties();
// 2. Давать kafka Добавьте информацию о конфигурации в объект конфигурации: bootstrap.servers.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value Сериализация (обязательно): key.serializer, value.serializer.
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. создавать kafka продюсерский объект
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. вызов send метод, отправить сообщение
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","yy " + i));
}
// 5. закрыть ресурс
kafkaProducer.close();
}
}
тест:
①Откройте потребителя Kafka на Hadoop102.
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
②Выполните код в IDEA и посмотрите, поступают ли сообщения в консоль Hadoop102.
Функция обратного вызова будет вызвана, когда производитель получит подтверждение. Это асинхронный вызов. Этот метод имеет два параметра: информацию о метаданных (RecordMetadata) и информацию об исключении (Exception). Если значение Exception равно нулю, это означает, что сообщение отправлено успешно. Если Exception не имеет значения. Если оно равно нулю, это означает, что отправка сообщения не удалась.
Примечание. Если сообщение не удалось отправить, оно будет повторено автоматически. Нам не нужно вручную повторять попытку в функции обратного вызова.
// 1. создавать kafka Объект конфигурации производителя
Properties properties = new Properties();
// 2. Давать kafka Объект конфигурации добавляет информацию о конфигурации.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value Сериализация (обязательно): key.serializer, value.serializer.
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. создавать kafka продюсерский объект
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. вызов send метод, отправить сообщение
for (int i = 0; i < 5; i++) {
// Добавить обратный вызов
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
// Методсуществовать Producer получать ack часвызов,дляасинхронныйвызов
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception == null) {
// Без исключений, выводить информацию в консоль
System.out.println(" тема: " +
metadata.topic() + "->" + "Раздел:" + metadata.partition());
} else {
// Происходит печать исключений
exception.printStackTrace();
}
}
});
// После задержки вы увидите, что данные отправляются в разные разделы.
Thread.sleep(2);
}
// 5. закрыть ресурс
kafkaProducer.close();
Просто вызовите метод get() на основе асинхронной отправки.
// Отправить синхронно
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
(1)Содействовать рациональному использованию ресурсов хранения.,КаждыйиндивидуальныйPartitionсуществоватьодининдивидуальныйBrokerхранение на,Огромные объемы данных можно разделить на разделы
Часть данных хранится на нескольких брокерах. Разумный контроль над задачами разделов может обеспечить эффект балансировки нагрузки.
(2)Улучшение параллелизма,производитель может отправлять данные в разделе для единиц; потребители могут использовать данные в разделе для единиц;
1)Разделитель по умолчанию DefaultPartitioner
В IDEA нажмите ctrl +n и выполните глобальный поиск DefaultPartitioner.
1)нуждаться
Например, если мы реализуем реализацию секционирования, и если отправленные данные содержат xxx, они будут отправлены в раздел 0.
Если он не содержит xxx, он будет отправлен в раздел №1.
2)выполнитьшаг
(1) Определите класс для реализации интерфейса Partitioner.
(2) Перепишите метод раздела().
/**
* 1. реализовать интерфейс Partitioner
* 2. выполнить 3 индивидуальныйметод:partition,close,configure
* 3. писать partition Метод, возвращает номер раздела
*/
public class MyPartitioner implements Partitioner {
/**
* Вернуть раздел, соответствующий информации
* @param topic тема
* @param key новости key
* @param keyBytes новости key Сериализованный массив байтов
* @param value новости value
* @param valueBytes новости value Сериализованный массив байтов
* @param cluster Метаданные кластера могут просматривать информацию о разделах.
* @return
*/
@Override
public int partition(String topic, Object key, byte[]
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Получить новости
String msgValue = value.toString();
// создавать partition
int partition;
// Определите, содержит ли сообщение atguigu
if (msgValue.contains("xxx")){
partition = 0;
}else {
partition = 1;
}
// Вернуть номер раздела
return partition;
}
// закрыть ресурс
@Override
public void close() {
}
// Метод настройки
@Override
public void configure(Map<String, ?> configs) {
}
}
(3) Используйте метод секционирования и добавьте параметры секционирования в конфигурацию производителя.
// добавить в Пользовательский разделитель
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yy.kafka.producer.MyPartitioner");
// пакет.размер: размер пакета, по умолчанию 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms: время ожидания, по умолчанию 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator: размер буфера, по умолчанию. 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compress.type: сжатие, по умолчанию нет, настраиваемое значение gzip、snappy、lz4 и zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
1)ack Принцип реагирования
// настраивать acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// Количество повторов повторные попытки, значение по умолчанию int Максимальное значение, 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
1)принцип идемпотентности
2)Как использовать идемпотентность
Открытые параметры enable.idempotence По умолчанию true,false закрытие.
1) Принцип транзакции Кафки
2) Транзакции Kafka имеют в общей сложности следующие 5 API.
// 1 Инициализировать транзакцию
void initTransactions();
// 2 Открыть транзакцию
void beginTransaction() throws ProducerFencedException;
// 3 существовать Зафиксировать потребленное возмещение в рамках транзакции (в основном используется для потребителей)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;
// 4 совершить транзакцию
void commitTransaction() throws ProducerFencedException;
// 5 Отменить транзакцию (операция, аналогичная откату транзакции)
void abortTransaction() throws ProducerFencedException;
3) Один производитель использует транзакции, чтобы гарантировать, что сообщения отправляются только один раз.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
public static void main(String[] args) throws InterruptedException {
// 1. создавать kafka Объект конфигурации производителя
Properties properties = new Properties();
// 2. Давать kafka Объект конфигурации добавляет информацию о конфигурации.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value сериализация
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// настраиватьдела идентификатор (обязательно), транзакция id Назовите все, что вам нравится
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. создавать kafka продюсерский объект
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// Инициализировать транзакцию
kafkaProducer.initTransactions();
// Открыть транзакцию
kafkaProducer.beginTransaction();
try {
// 4. вызов send метод, отправить сообщение
for (int i = 0; i < 5; i++) {
// отправить сообщение
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
}
// int i = 1 / 0;
// совершить транзакцию
kafkaProducer.commitTransaction();
} catch (Exception e) {
// Завершить транзакцию
kafkaProducer.abortTransaction();
} finally {
// 5. закрыть ресурс
kafkaProducer.close();
}
}
}
1) Kafka гарантирует, что данные в одном разделе упорядочиваются до версии 1.x. Условия следующие:
max.in.flight.requests.per.connection=1(Не нужно думать, стоит ли его включать Идемпотентность)。
2) Kafka обеспечивает порядок данных в отдельных разделах в версиях 1.x и более поздних. Условия следующие:
(1) Идемпотентность не включена
max.in.flight.requests.per.connectionнуждатьсянастраиватьдля1。
(2) Включить идемпотентность
max.in.flight.requests.per.connectionнуждатьсянастраиватьменьше или равно5。
Объяснение причины: поскольку после kafka1.x после включения идемпотентности сервер Kafka будет кэшировать метаданные последних 5 запросов, отправленных производителем.
Поэтому, несмотря ни на что, мы можем гарантировать, что данные последних пяти запросов в порядке.