Подробное объяснение потребительского API Apache Kafka
Подробное объяснение потребительского API Apache Kafka

Подробное объяснение потребительского API Apache Kafka

Apache Kafka Это платформа распределенной потоковой обработки с высокой пропускной способностью и малой задержкой для создания конвейеров данных в реальном времени и потоковых приложений. существовать Kafka , потребитель несет ответственность за Kafka Чтение сообщений из кластера. В этой статье будет подробно продемонстрировано Kafka потребитель API Использование включает в себя настройку, потребление сообщений, обработку ошибок, оптимизацию производительности и т. д.

1. Подготовка среды

Прежде чем начать, убедитесь, что вы установили и настроили кластер Kafka. Если нет, обратитесь к официальной документации Kafka для установки и настройки.

2. Конфигурация проекта Maven

Сначала создайте новый Maven проект и в pom.xml Добавить в файл Kafka Клиентские зависимости:

Язык кода:javascript
копировать
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>kafka-consumer-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
</project>
3. Конфигурацияпотребитель

Kafka Для правильной работы потребитель требует ряда параметров конфигурации. Эти параметры можно передать Properties Объект установлен. Вот базовый пример конфигурации:

Язык кода:javascript
копировать
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
3.1 Конфигурацияпараметр Подробное объяснение
  • bootstrap.servers:Kafka Список адресов кластера. Возможна конфигурация одного или нескольких Kafka broker。
  • group.id:потребитель Уникальный идентификатор группы。все принадлежат к одной группепотребитель Координационная работа,Обычно потребляют сообщения в темах.
  • key.deserializer и value.deserializer:ключ сообщенияи Десериализатор значений。Kafka Предоставляются различные десериализаторы, такие как StringDeserializerIntegerDeserializer ждать.
  • auto.offset.reset:определениепотребитель Как справиться с ситуацией, когда начальное смещение отсутствует или смещение не существует на сервере。earliest Указывает потребление, начиная с самого раннего сообщения.
4. Потребление сообщений

потребитель подписывается на одну или несколько тем и периодически звонит им poll метод из Kafka Извлекать сообщения из。poll Метод возвращает сообщение, содержащее несколько ConsumerRecords объект.

4.1 Потребление новостей

Следующий код показывает, как получать и обрабатывать сообщения, полученные из Kafka:

Язык кода:javascript
копировать
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
5. Управление смещением

Kafka Положение каждого потребляемого товара в каждом разделе отслеживается по смещению. Управление Смещением является важным аспектом потребительского применения.

5.1 Автоматически фиксировать смещения

По умолчанию Кафка потребительвстреча Автоматически фиксировать смещения. Может быть установлен с помощью enable.auto.commit Параметры для включения или отключения автофиксации:

Язык кода:javascript
копировать
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
5.2 Фиксация смещений вручную

Ручное фиксирование смещений обеспечивает более точный контроль. Следующий код показывает, как вручную фиксировать смещения:

Язык кода:javascript
копировать
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}
6. Обработка ошибок

в производственной среде,потребитель может столкнуться с различными ошибками,Например, сбой сети, Кафка broker Недоступно и т. д. Обработка этих ошибок является ключом к обеспечению надежного использования сообщений.

Язык кода:javascript
копировать
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // Игнорируйте это исключение, если мы завершаем работу
} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}
7. Оптимизация производительности

Чтобы улучшить производительность потребителя, ее можно оптимизировать следующими способами:

7.1 Увеличение интервала опроса

увеличивать poll Период ожидания метода может уменьшить потребность в Kafka количество запросов, тем самым улучшая производительность:

Язык кода:javascript
копировать
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
7.2 Увеличение размера выборки

Можно добавить, добавив fetch.min.bytes и fetch.max.wait.ms Параметры для уменьшения частоты получения сообщений, что повышает производительность:

Язык кода:javascript
копировать
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "50000");  // 50KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000"); // 1 секунда
8. Полный пример

Ниже приведен полный Kafka потребитель Пример,Содержит все Конфигурация、Потребление сообщенийи Обработка ошибоклогика:

Язык кода:javascript
копировать
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "50000");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            // Игнорируйте это исключение, если мы завершаем работу
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}
9. Эффект операции

При запуске приведенного выше кода потребитель будет сгенерирован из Kafka в кластере my-topic в теме Потребляйте новости. Ключ и значение каждого сообщения будут выведены на консоль. Если Потребление сообщений прошла успешно, консоль распечатает смещение и значение ключа сообщения.

10. Резюме

В этой статье подробно описан Apache

Kafka потребитель API Использование,включать Конфигурация、Потребление сообщений、Управление смещением、Обработка ошибоки Оптимизация производительность. Понимая и применяя на практике это содержание, вы сможете лучше использовать Kafka потребитель для эффективного и надежного потребления данных.

Я надеюсь, что эта статья будет вам полезна. Если у вас есть какие-либо вопросы или предложения, оставьте сообщение для обсуждения.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose