Когда мы говорим о Spring Kafka, думайте о нем как об очень хорошем почтальоне, но вместо того, чтобы доставлять обычные письма, вы обрабатываете большие объемы интересных и полезных данных. Этот почтальон хорошо взаимодействует с Kafka и работает с данными на высоком уровне абстракции и в простой в использовании форме. Работа этого почтальона — доставлять данные из одного места в другое, точно так же, как мы отправляем посылки. Он знает, как общаться с Кафкой и как устанавливать связи с темами ввода и вывода. Когда кто-то помещает данные во входную тему, почтальон немедленно получает уведомление и быстро извлекает данные. Затем он выполняет различные интересные преобразования и операции обработки данных, как опытный фокусник. Он может конвертировать данные в различные форматы, выполнять операции агрегации, фильтрации, объединения и шунтирования. После обработки данных почтальон помещает их в специальный пакет и маркирует его адресом назначения, который является предметом вывода. Затем он быстро отправляет посылку, чтобы обеспечить своевременную доставку данных. Spring Kafka похож на ящик с инструментами почтальона, предоставляя множество полезных инструментов и функций, облегчающих его работу. Он предоставляет простой и декларативный API, который позволяет нам интуитивно определять логику обработки данных и топологию потоковой обработки.
Затем начинается текст
Spring Kafka — это библиотека, предоставляемая Spring Framework, которая интегрирует Apache Kafka и используется для создания приложений обработки потоков данных в реальном времени на основе Kafka. Apache Kafka — это высокопроизводительная распределенная платформа потоковой передачи данных, широко используемая для создания масштабируемых конвейеров обработки данных в реальном времени.
Обработка потоков данных в реальном времени очень важна для современного бизнеса. В условиях быстрого развития Интернета и ускорения цифровой трансформации предприятия сталкиваются с проблемой создания и обработки больших объемов данных. Обработка потока данных в реальном времени может помочь предприятиям собирать, обрабатывать и анализировать данные в режиме реального времени, позволяя предприятиям принимать своевременные решения, предоставлять персонализированные услуги и оптимизировать бизнес-процессы. Обработка потока данных в режиме реального времени также может помочь предприятиям выявить потенциальные возможности и риски и принять быстрые меры.
Прежде чем приступить к изучению Spring Kafka, очень важно понять основные концепции и компоненты Apache Kafka. Некоторые основные концепции включают в себя:
Знакомство с основными методами использования и интеграции Spring Kafka:
Spring Kafka предоставляет простой, но мощный API для использования Kafka в приложениях Spring. Он обеспечивает следующие основные функции:
KafkaTemplate
класс позволяет легко публиковать сообщения в Kafka тема.@KafkaListener
Аннотации упрощают создание потребителей сообщений и обработку сообщений от Kafka Тема из сообщения.существовать Spring Kafka Публикуйте сообщения в Kafka темы, которые вы можете использовать KafkaTemplate
сорт send()
метод. Вы можете отправить сообщение на Kafka。
Хотите потреблять Kafka Сообщения в теме, которую вы можете использовать @KafkaListener
комментироватьсоздаватьодининформацияслушатель。проходить Укажите для мониторингаизтемаиинформация Метод лечения,Может существовать инициировать соответствующую логику при получении сообщения.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void publishMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
Хотите потреблять Kafka Сообщения в теме, которую вы можете использовать @KafkaListener
комментироватьсоздаватьодининформацияслушатель。проходить Укажите для мониторингаизтемаиинформация Метод лечения,Может существовать инициировать соответствующую логику при получении сообщения.
@KafkaListener(topics = "myTopic")
public void consumeMessage(String message) {
// Обработать полученное сообщение
System.out.println("Received message: " + message);
}
Понимание сериализации и десериализации сообщений:
существовать Kafka Сериализация и десериализация сообщений — очень важные понятия. Когда сообщение отправляется на Kafka час,Их необходимо преобразовать в потоки байтов. Так же,существовать, когда сообщение будет использовано,Их необходимо преобразовать в исходный формат данных.
Spring Kafka предоставляет механизм сериализации и десериализации по умолчанию, который может автоматически конвертировать в зависимости от типа сообщения. Для распространенных типов данных, таких как строки, JSON, байтовые массивы и т. д., Spring Kafka предоставил соответствующие реализации сериализации и десериализации. Кроме того, вы можете настроить сериализаторы и десериализаторы для обработки определенных форматов сообщений.
Например, вы можете использовать StringSerializer и StringDeserializer для сериализации и десериализации строковых сообщений:
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Группа потребителей — это группа потребителей с одинаковым идентификатором группы потребителей, которые совместно потребляют сообщения из одной или нескольких тем Kafka. Роль группы потребителей заключается в обеспечении параллельной обработки и балансировки нагрузки сообщений. Назначая разделы темы разным потребителям в группе потребителей, можно добиться параллельной обработки сообщений, повышая пропускную способность обработки и уменьшая задержку. Группы потребителей также обеспечивают отказоустойчивость; в случае сбоя потребителя другие потребители могут взять на себя управление его разделом и продолжить обработку сообщений.
Предположим, существует онлайн-платформа электронной коммерции.,Пользователи могут приобретать товары на существующей платформе. Платформе необходимо обрабатывать заказы пользователей.,и отправьте информацию о заказе на Kafka в теме. Обработка заказов включает в себя такие операции, как проверка заказов, создание счетов и обновление запасов.
существуют В этой сцене,Обработка заказов, параллельная обработка и балансировка нагрузки могут быть достигнуты с помощью групп. Конкретные шаги заключаются в следующем:
Конкретная реализация:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class OrderConsumer {
private static final String TOPIC = "order";
private static final String GROUP_ID = "order-processing-group";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Создать потребительскую конфигурацию
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// создавать Kafka потребитель
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Подписаться на тему
consumer.subscribe(Collections.singletonList(TOPIC));
// Потребляйте новости
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String orderMessage = record.value();
// Выполнять операции по обработке заказов, такие как проверка заказов, создание заказов на отгрузку, обновление запасов и т. д.
processOrder(orderMessage);
}
}
}
private static void processOrder(String orderMessage) {
// Реализация логики обработки заказов
System.out.println("Processing order: " + orderMessage);
// TODO: Выполнять обработку заказов и конкретную бизнес-логику
}
}
Kafka Streams — это клиентская библиотека для создания приложений обработки потоков в реальном времени.
Это позволяет разработчикам обрабатывать потоки данных в темах Kafka простым и декларативным способом.
Kafka Streams предоставляет богатые функции, включая преобразование данных, агрегацию данных, оконные операции, соединения и разгрузку и т. д.
// создать построитель топологий
StreamsBuilder builder = new StreamsBuilder();
// создать входной поток
KStream<String, String> inputStream = builder.stream("input-topic");
// Выполнять операции преобразования и обработки данных
KStream<String, String> outputStream = inputStream
.mapValues(value -> value.toUpperCase())
.filter((key, value) -> value.startsWith("A"));
// Вывести результаты обработки в тему вывода
outputStream.to("output-topic");
// создавать Kafka Streams Пример
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Он обладает высокой масштабируемостью и отказоустойчивостью и может обрабатывать крупномасштабные потоки данных за счет горизонтального масштабирования.
Библиотека Kafka Streams тесно интегрирована с экосистемой Kafka и может легко интегрироваться с другими компонентами и инструментами Kafka.
<dependencies>
<!-- Spring Kafka Связанные зависимости -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.8.1</version>
<scope>test</scope>
</dependency>
<!-- Другие зависимости -->
</dependencies>
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
@EnableKafka
public class SpringKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApp.class, args);
}
// создаватьвходитьивыходтема
@Bean
public NewTopic inputTopic() {
return new NewTopic("input-topic", 1, (short) 1);
}
@Bean
public NewTopic outputTopic() {
return new NewTopic("output-topic", 1, (short) 1);
}
// Определить топологию обработки потока
@KafkaListener(topics = "input-topic")
public void processInputMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// существоватьздесь Выполнять операции преобразования и обработки данных
String processedMessage = message.toUpperCase();
// Отправить результаты обработки в выходную тему
kafkaTemplate().send("output-topic", processedMessage);
}
// создавать KafkaTemplate Пример
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// создавать ProducerFactory Пример
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
}
проходить
@EnableKafka
Аннотации включены Spring Kafka。 проходить@Bean
Аннотация создает входную тему и выходную тему.NewTopic
Пример. использовать@KafkaListener
Аннотированный метод действует как прослушиватель сообщений, а имя прослушивателя: "input-topic" вводная тема. существоватьprocessInputMessage
в методе,нас Может Выполнять операции преобразования и обработки данных。существовать В этом примере,мы получимизинформация Преобразовать в верхний регистр。 Затем мы используемKafkaTemplate
Отправьте результаты обработки в файл с именем "output-topic" Выводная тема. проходить@Bean
Аннотация созданаKafkaTemplate
иProducerFactory
Экземпляр , используемый для отправки сообщений Kafka。