Сначала, а затем прочитайте, Брат Нэн поможет вам получить более половины ваших навыков Java.
Транзакция Кафкина самом деле представилАтомная многораздельная запись
концепция,Federico Valeri
Подкастер нарисовал следующую блок-схему.,Показывает, как транзакции работают на уровне раздела.
Я брат Нэн, лидер в области изучения и развития Java. Я думаю, вам будет полезно пройти собеседование и получить предложение поступить в нужную вам компанию.
⭐⭐⭐Эта статья включена в《Javaизучать/Передовой/Руководство по собеседованию》:https://github..JavaSouth
Интервьюер: Можете ли вы рассказать мне о делах Кафки?
Транзакции Kafka в основном используются в потоковых приложениях. Потоковая передача? Это звучит очень запутанно, и я не знаю, что это такое.
Транзакция Стриминг при поддержке Кафкииметь дело Обычно процесс выглядит следующим образом. Программа A получает сообщение из темы A и обрабатывает это сообщение. дело сназад,Затем напишите результаты в тему Б.,назад ПродолжениеBПрограмма будетBСообщение темы осуществляется Потребление。То естьПотребление - иметь дело с - Производство
процесс。
Такой процесс предполагает потребление двух сообщений и создание одного сообщения. Как обеспечить транзакционный характер всего процесса и сделать весь процесс успешным или неудачным — вот что делают транзакции Kafka.
Брат Нэн нарисовал блок-схему, чтобы помочь всем понять.
Интервьюер: В чем проблема не использовать транзакции в процессе, о котором вы говорите?
Потоковое вещание дело спроцедурныйПотребление - иметь дело с - Производство
процесс,Если нет гарантии сделки,Могут возникнуть проблемы с повторным потреблением нескольких сообщений.,Это вызовет всевозможные странные проблемы.
Весь процесс оплаты включает в себя множество процессов, особенно в финансовой и платежной отраслях, например, размещение заказов пользователями. -> Проверка инвентаря -> Заказать иметь дело с-> Фактический вычет -> Клиринг и расчеты, эти бизнес-сценарии используют потоковую передачу. дело программа. В бизнес-сценариях, связанных с фондами, защита дел еще важнее! !
Позвольте мне рассказать о сценарии многократного потребления двух сообщений.
Давайте возьмем приведенный выше сценарий в качестве примера:Программа потребляет сообщения A из темы A.,После выполнения иметь дело с сообщением,Затем напишите результаты в тему Б.,назад ПродолжениеBПрограмма будетBСообщение темы осуществляется Потребление。
(1) Повторное потребление, вызванное сбоем программы.
еслиAпрограмма После выполнения иметь дело с сообщением,Напишите результат вBтема。ноЗафиксировать компенсациюразбился, когда,В это время Кафка будет думать, что сообщение А еще не использовано.,иAпрограммарухнулKafkaназначит раздел новому Потребление ВОЗ。
Возникает проблема. Новый потребитель снова воспользуется сообщением A, а это означает, что в тему B записываются два одинаковых сообщения, а сообщение A используется дважды.
(2) Повторное потребление, вызванное программами-зомби.
Если потребительская программа думает, что она не мертва, но после прекращения отправки сигналов Kafka в течение определенного периода времени Kafka думает, что она мертва, такая программа называется программой-зомби.
После того, как программа A прочитает сообщение от Kafka, она временно зависает. Она теряет соединение с Kafka и не может фиксировать смещения. В это время Кафка считает, что он мертв, и распределяет потребление A среди новых потребителей.
Однако после последующего восстановления программы A она продолжит записывать сообщения A в тему B, что по-прежнему приводит к двойному расходованию потребления A.
Возможно, многие люди скажут,Этот процесс имеет проблему повторного потребления,Чтоиметь дело с Проблемы повторного потребления недостаточно, нет необходимости вводить Транзакцию Кафки такой сложный. Но в таких строгих и важных бизнес-сценариях, как финансы и платежи, мы хотим, чтобы даже если во всем процессе произойдет небольшая ошибка, весь процесс был успешным. дело с Все процессы необходимо откатить.
Интервьюер: Транзакция Кафки Есть ли вопросы о том, не может ли Транзакция Кафки иметь дело с?
Конечно на протяжении всей Транзакции В процессе Кафки есть определенные операции, которые нельзя откатить. Кафки не поддерживает иметь дело с., давайте посмотрим.
(1) Процесс транзакции Kafka добавляет внешнюю логику
НапримерAпрограмма ПотреблениеинформацияAпроцесссередина,Отправил уведомление по электронной почте,Вся эта внешняя операция необратима.,Не в бизнесе дело в диапазоне с.
(2) Прочитайте сообщение Kafka и запишите его в базу данных.
На самом деле это также можно рассматривать как внешнее преимущество. дело сLogic, транзакции базы данных отсутствуют в Транзакции Кафкиизиметь дело в диапазоне с.
Интервьюер: Вы когда-нибудь использовали SpringBoot для отправки транзакционных сообщений Kafka?
В проекте SpringBoot мы можем легко использовать Транзакцию. Кафки,При поддержке Транзакция Кафки,мы можем гарантироватьОтправка сообщений и компенсационная отправкатранзакционный,отиизбегайте вышеперечисленногоиз Повторяющаяся проблема с потреблением。
(1)Сначала представьspring-kafka
полагаться
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>xxx</version>
</dependency>
</dependencies>
(2) Настройте менеджер транзакций Kafka и фабрику производителей.
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(configProps);
factory.setTransactionIdPrefix("tran-");
return factory;
}
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
(3)использоватьKafkaTemplate
Отправить транзакционныйинформация
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@EnableKafka
@Service
public class KafkaConsumerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Transactional
@KafkaListener(topics = "A")
public void processMessage(String message) {
// иметь дело с Сообщение получено из темы А
String processedMessage = "Processed " + message;
// Воляиметь дело Сообщение после с отправляется в тему B
kafkaTemplate.send("B", processedMessage);
// Зафиксируйте транзакцию, гарантируя, что отправка сообщения и фиксация смещения выполняются одновременно.
}
}
Я Брат Нэн, Нэн — это Нэн. Я нашла ваши интересные комментарии на Get➕Like➕Follow.
Творить непросто, поэтому вы можете ставить лайки, собирать и подписываться, чтобы поддержать его. Ваша поддержка — самая большая мотивация для моего творчества.❤️