[Расширенное развитие основных принципов Spring] Spring Kafka: обработка потока данных в реальном времени, ускоряющая рост вашего бизнеса! ️
[Расширенное развитие основных принципов Spring] Spring Kafka: обработка потока данных в реальном времени, ускоряющая рост вашего бизнеса! ️

введение в историю

Когда мы говорим о Spring Kafka, думайте о нем как об очень хорошем почтальоне, но вместо того, чтобы доставлять обычные письма, вы обрабатываете большие объемы интересных и полезных данных. Этот почтальон хорошо взаимодействует с Kafka и работает с данными на высоком уровне абстракции и в простой в использовании форме. Работа этого почтальона — доставлять данные из одного места в другое, точно так же, как мы отправляем посылки. Он знает, как общаться с Кафкой и как устанавливать связи с темами ввода и вывода. Когда кто-то помещает данные во входную тему, почтальон немедленно получает уведомление и быстро извлекает данные. Затем он выполняет различные интересные преобразования и операции обработки данных, как опытный фокусник. Он может конвертировать данные в различные форматы, выполнять операции агрегации, фильтрации, объединения и шунтирования. После обработки данных почтальон помещает их в специальный пакет и маркирует его адресом назначения, который является предметом вывода. Затем он быстро отправляет посылку, чтобы обеспечить своевременную доставку данных. Spring Kafka похож на ящик с инструментами почтальона, предоставляя множество полезных инструментов и функций, облегчающих его работу. Он предоставляет простой и декларативный API, который позволяет нам интуитивно определять логику обработки данных и топологию потоковой обработки.

Затем начинается текст

Введение и предыстория:

Spring Kafka — это библиотека, предоставляемая Spring Framework, которая интегрирует Apache Kafka и используется для создания приложений обработки потоков данных в реальном времени на основе Kafka. Apache Kafka — это высокопроизводительная распределенная платформа потоковой передачи данных, широко используемая для создания масштабируемых конвейеров обработки данных в реальном времени.

Причины, по которым обработка потоков данных в реальном времени имеет решающее значение для вашего бизнеса:

Обработка потоков данных в реальном времени очень важна для современного бизнеса. В условиях быстрого развития Интернета и ускорения цифровой трансформации предприятия сталкиваются с проблемой создания и обработки больших объемов данных. Обработка потока данных в реальном времени может помочь предприятиям собирать, обрабатывать и анализировать данные в режиме реального времени, позволяя предприятиям принимать своевременные решения, предоставлять персонализированные услуги и оптимизировать бизнес-процессы. Обработка потока данных в режиме реального времени также может помочь предприятиям выявить потенциальные возможности и риски и принять быстрые меры.

Основы Spring Kafka:

Узнайте больше об основных концепциях и компонентах Apache Kafka:

Прежде чем приступить к изучению Spring Kafka, очень важно понять основные концепции и компоненты Apache Kafka. Некоторые основные концепции включают в себя:

  • Тема: Категория или тема сообщения.
  • Раздел: Тема разделена на несколько разделов.,Каждый раздел в порядке,И может копироваться на нескольких машинах.
  • Продюсер: отвечает за публикацию сообщений в темах Kafka.
  • Потребитель: подписывайтесь на сообщения из тем Kafka и получайте их.
  • Группа потребителей: группа потребителей совместно изучает одну или несколько тем, и раздел каждой темы назначается потребителю в группе потребителей.
  • компенсировать(Offset):потребитель Можно отслеживать потреблениеизинформацияиз Расположение,смещение для представления.

Знакомство с основными методами использования и интеграции Spring Kafka:

Spring Kafka предоставляет простой, но мощный API для использования Kafka в приложениях Spring. Он обеспечивает следующие основные функции:

  • информация Производство:использовать Spring Kafka из KafkaTemplate класс позволяет легко публиковать сообщения в Kafka тема.
  • информация Потребление:проходитьиспользовать Spring Kafka Предоставить из @KafkaListener Аннотации упрощают создание потребителей сообщений и обработку сообщений от Kafka Тема из сообщения.
  • Обработка ошибок: весна Kafka предоставляет гибкий механизм обработки ошибок, который может обрабатывать различные ошибки в процессе публикации и использования сообщений.
  • Поддержка транзакций: Весна Kafka поддержка и Spring Механизм управления транзакциями интегрирован для реализации публикации и потребления сообщений из транзакционных операций.

Публикация и потребление сообщений:

существовать Spring Kafka Публикуйте сообщения в Kafka темы, которые вы можете использовать KafkaTemplate сорт send() метод. Вы можете отправить сообщение на Kafka。

Хотите потреблять Kafka Сообщения в теме, которую вы можете использовать @KafkaListener комментироватьсоздаватьодининформацияслушатель。проходить Укажите для мониторингаизтемаиинформация Метод лечения,Может существовать инициировать соответствующую логику при получении сообщения.

Язык кода:javascript
копировать
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void publishMessage(String topic, String message) {
    kafkaTemplate.send(topic, message);
}

Хотите потреблять Kafka Сообщения в теме, которую вы можете использовать @KafkaListener комментироватьсоздаватьодининформацияслушатель。проходить Укажите для мониторингаизтемаиинформация Метод лечения,Может существовать инициировать соответствующую логику при получении сообщения.

Язык кода:javascript
копировать
@KafkaListener(topics = "myTopic")
public void consumeMessage(String message) {
    // Обработать полученное сообщение
    System.out.println("Received message: " + message);
}

Понимание сериализации и десериализации сообщений:

существовать Kafka Сериализация и десериализация сообщений — очень важные понятия. Когда сообщение отправляется на Kafka час,Их необходимо преобразовать в потоки байтов. Так же,существовать, когда сообщение будет использовано,Их необходимо преобразовать в исходный формат данных.

Spring Kafka предоставляет механизм сериализации и десериализации по умолчанию, который может автоматически конвертировать в зависимости от типа сообщения. Для распространенных типов данных, таких как строки, JSON, байтовые массивы и т. д., Spring Kafka предоставил соответствующие реализации сериализации и десериализации. Кроме того, вы можете настроить сериализаторы и десериализаторы для обработки определенных форматов сообщений.

Например, вы можете использовать StringSerializer и StringDeserializer для сериализации и десериализации строковых сообщений:

Язык кода:javascript
копировать
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Управление группами потребителей:

Понятие и роль групп потребителей:

Группа потребителей — это группа потребителей с одинаковым идентификатором группы потребителей, которые совместно потребляют сообщения из одной или нескольких тем Kafka. Роль группы потребителей заключается в обеспечении параллельной обработки и балансировки нагрузки сообщений. Назначая разделы темы разным потребителям в группе потребителей, можно добиться параллельной обработки сообщений, повышая пропускную способность обработки и уменьшая задержку. Группы потребителей также обеспечивают отказоустойчивость; в случае сбоя потребителя другие потребители могут взять на себя управление его разделом и продолжить обработку сообщений.

Внедрение эффективноиз Управление группами потребителей: Вот некоторые ключевые соображения по эффективному управлению группами потребителей:

  1. Выбор идентификатора группы потребителей: выберите уникальный идентификатор для каждой группы потребителей, чтобы гарантировать, что разные группы потребителей не мешают друг другу.
  2. Стратегия выделения разделов. Выберите подходящую стратегию выделения разделов, чтобы обеспечить балансировку нагрузки разделов, назначенных потребителям, и избежать перегрузки или простоя некоторых потребителей.
  3. Динамическое расширение и сокращение: динамически увеличивайте или уменьшайте количество потребителей в зависимости от условий нагрузки и потребностей обработки для достижения эластичного управления группами потребителей.
  4. Мониторинг и проверка работоспособности: отслеживайте рабочее состояние группы потребителей, своевременно обнаруживайте и устраняйте неисправные потребители, а также обеспечивайте стабильную работу группы потребителей.

Конкретные методы ведения бизнеса:

Предположим, существует онлайн-платформа электронной коммерции.,Пользователи могут приобретать товары на существующей платформе. Платформе необходимо обрабатывать заказы пользователей.,и отправьте информацию о заказе на Kafka в теме. Обработка заказов включает в себя такие операции, как проверка заказов, создание счетов и обновление запасов.

существуют В этой сцене,Обработка заказов, параллельная обработка и балансировка нагрузки могут быть достигнуты с помощью групп. Конкретные шаги заключаются в следующем:

  1. Создайте файл с именем «order»из. Kafka Тема, используемая для получения информации о заказах пользователя.
  2. Создайте группу потребителей, например, с именем «группа обработки заказов» из группы потребителей.
  3. Запустите несколько экземпляров потребителей и присоединитесь к группе потребителей «группа обработки заказов». Каждый экземпляр потребителя подписывается на тему «заказ» и независимо принимает сообщения о заказе.
  4. Kafka В соответствии с конфигурацией группы потребителей раздел темы «заказ» будет равномерно распределен между потребителями в группе потребителей. Каждый экземпляр потребителя будет независимо обрабатывать сообщения о заказе, назначенные ему в разделе.
  5. Когда новое сообщение о заказе поступает в тему «заказ», Кафка Сообщение будет назначено потребителю в группе потребителей. Экземпляр потребителя будет обрабатывать сообщение о заказе, выполнять проверку, генерировать счета, обновлять запасы и выполнять другие операции.

Конкретная реализация:

Язык кода:javascript
копировать
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderConsumer {
    private static final String TOPIC = "order";
    private static final String GROUP_ID = "order-processing-group";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Создать потребительскую конфигурацию
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // создавать Kafka потребитель
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // Подписаться на тему
        consumer.subscribe(Collections.singletonList(TOPIC));

        // Потребляйте новости
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String orderMessage = record.value();
                // Выполнять операции по обработке заказов, такие как проверка заказов, создание заказов на отгрузку, обновление запасов и т. д.
                processOrder(orderMessage);
            }
        }
    }

    private static void processOrder(String orderMessage) {
        // Реализация логики обработки заказов
        System.out.println("Processing order: " + orderMessage);
        // TODO: Выполнять обработку заказов и конкретную бизнес-логику
    }
}

Потоковая обработка и топология обработки

Концепции и особенности Kafka Streams:

Kafka Streams — это клиентская библиотека для создания приложений обработки потоков в реальном времени.

Это позволяет разработчикам обрабатывать потоки данных в темах Kafka простым и декларативным способом.

Kafka Streams предоставляет богатые функции, включая преобразование данных, агрегацию данных, оконные операции, соединения и разгрузку и т. д.

Язык кода:javascript
копировать
 // создать построитель топологий
        StreamsBuilder builder = new StreamsBuilder();

        // создать входной поток
        KStream<String, String> inputStream = builder.stream("input-topic");

        // Выполнять операции преобразования и обработки данных
        KStream<String, String> outputStream = inputStream
                .mapValues(value -> value.toUpperCase())
                .filter((key, value) -> value.startsWith("A"));

        // Вывести результаты обработки в тему вывода
        outputStream.to("output-topic");

        // создавать Kafka Streams Пример
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

Он обладает высокой масштабируемостью и отказоустойчивостью и может обрабатывать крупномасштабные потоки данных за счет горизонтального масштабирования.

Библиотека Kafka Streams тесно интегрирована с экосистемой Kafka и может легко интегрироваться с другими компонентами и инструментами Kafka.

Создайте и разверните топологию потоковой обработки с помощью Spring Kafka:

  • Spring Kafka да Spring Framework Предоставить из используется с Kafka Интерактивный модуль.
  • Он обеспечивает высокоуровневую абстракцию и простой в использовании API, разработку и интеграцию приложений потоковой обработки Kafka.
  • использовать Spring Kafka,Можетпроходить Конфигурацияикомментировать Определить топологию обработки потока,Включая темы ввода и вывода, логику преобразования и обработки данных и т. д.
  • Spring Kafka Также предоставляется Spring Boot Интеграция упрощает процесс настройки и развертывания приложения.

упражняться:

Сначала добавьте следующие зависимости Maven в файл pom.xml:

Язык кода:javascript
копировать
<dependencies>
    <!-- Spring Kafka Связанные зависимости -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <version>2.8.1</version>
        <scope>test</scope>
    </dependency>
    <!-- Другие зависимости -->
</dependencies>

Затем создайте приложение потоковой обработки Spring Kafka:

Язык кода:javascript
копировать
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

@SpringBootApplication
@EnableKafka
public class SpringKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApp.class, args);
    }

    // создаватьвходитьивыходтема
    @Bean
    public NewTopic inputTopic() {
        return new NewTopic("input-topic", 1, (short) 1);
    }

    @Bean
    public NewTopic outputTopic() {
        return new NewTopic("output-topic", 1, (short) 1);
    }

    // Определить топологию обработки потока
    @KafkaListener(topics = "input-topic")
    public void processInputMessage(@Payload String message,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        // существоватьздесь Выполнять операции преобразования и обработки данных
        String processedMessage = message.toUpperCase();

        // Отправить результаты обработки в выходную тему
        kafkaTemplate().send("output-topic", processedMessage);
    }

    // создавать KafkaTemplate Пример
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // создавать ProducerFactory Пример
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

проходить @EnableKafka Аннотации включены Spring Kafka。 проходить @Bean Аннотация создает входную тему и выходную тему. NewTopic Пример. использовать @KafkaListener Аннотированный метод действует как прослушиватель сообщений, а имя прослушивателя: "input-topic" вводная тема. существовать processInputMessage в методе,нас Может Выполнять операции преобразования и обработки данных。существовать В этом примере,мы получимизинформация Преобразовать в верхний регистр。 Затем мы используем KafkaTemplate Отправьте результаты обработки в файл с именем "output-topic" Выводная тема. проходить @Bean Аннотация создана KafkaTemplate и ProducerFactory Экземпляр , используемый для отправки сообщений Kafka。

На этом все. Увидимся в следующий раз~

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose