[Spring Cloud]Введение в компонент Stream
[Spring Cloud]Введение в компонент Stream

SCS внесла большие изменения в 3.x, упразднив такие классы, как @StreamListener, @Input, @Output и т. д., сохранив Binder и Binding и предоставив поддержку пакетного потребления. В соответствии с принципом изучения нового, а не старого, в этой статье будет представлен контент, связанный с SCS 3.x. Поскольку документации по потоку весенних облаков Kafka относительно достаточно, в этой статье она используется в качестве примера для представления SCS.

Binder — это компонент, обеспечивающий интеграцию с внешним промежуточным программным обеспечением сообщений. Он предоставляет два метода для привязки, а именно:bindConsumer иbindProducer, которые используются для создания производителей и потребителей. Привязка — это мост, соединяющий приложения и промежуточное программное обеспечение для сообщений, который используется для потребления и создания сообщений.

Связывающая транзакция

Не пытайтесь повторить попытку и зафиксировать мертвые буквы в транзакции. При повторной попытке транзакция могла быть возвращена. Если вы хотите отправить мертвое письмо о последствиях, вы можете использовать DefaultAfterRollbackProcessor отправлять мертвые письма после отката.

Error Channel

Связующее устройство будет использовать канал ошибок для передачи исключений потребителю. В то же время асинхронный производитель может быть настроен на передачу исключения в канал ошибок при возникновении исключения.

Dead-Letter

По умолчанию очередь недоставленных писем для темы будет находиться в том же разделе, что и исходная запись.

Сообщения в очереди недоставленных писем разрешено восстанавливать, но следует избегать повторных сбоев обработки сообщений, приводящих к множественным зацикливаниям в очереди недоставленных писем.

Для обработки информации в этих очередях недоставленных писем следует использовать специальный обработчик.

Потребитель Потребитель

Как следует из названия, Consumer определяет потребителя, который представляет собой функциональный интерфейс, предоставляющий методы для получения сообщений. Мы можем добиться этого непосредственно в объявлении компонента, используя лямбда-выражение.

Стоит отметить, что Consumer также является универсальным интерфейсом, который связывает типы сообщений через дженерики. Мы будем использовать класс KStream для получения типа сообщения. Он будет соответствовать KStream, определенному при отправке сообщения. Это абстрактный поток записей, состоящий из пар ключ-значение, но записи с тем же ключом не будут перезаписаны.

Язык кода:javascript
копировать
@Bean
public Consumer<KStream<Object, String>> consumer() {
		return input -> input.foreach((key, value) -> {
				do consume;
		});
}

Когда мы заявляем в приложении о возврате Consumer из Бин, тогда это Bean Он автоматически получит доступ к очереди сообщений. Кроме того, нам необходимо использовать spring.cloud.stream.bindings.{beanName}-in-{idx}={topic} Настроить подписку из темы сообщения. По умолчанию тема и beanName То же имя.

spring.cloud.stream.bindings.consumer-in-0 = userBuy

Когда сообщение получено, оно называется Consumer Определение accept Метод потребления сообщений.

Отправить сообщение производителю

SCS и нет ответа на Отправить сообщение делает специальную инкапсуляцию, но рекомендует поддерживать ее через отдельные очереди сообщений client или template Отправить сообщение.

Язык кода:javascript
копировать
		kafkaTemplate.send(message);

Функция перерабатывающего завода

Но иногда нам необходимо обработать данные и отправить их обратно в очередь сообщений. В этом случае будет использоваться функция.

это и Consumer Аналогично, но метод имеет дополнительное возвращаемое значение. Аналогично, это возвращаемое значение необходимо использовать KStream класс, чтобы он мог поддерживать возврат обработанных данных в очередь сообщений.

Язык кода:javascript
копировать
@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
		return input -> input.map((key, value) -> {
			do process;
			return new KeyValue(key, value);
		})
}

spring.cloud.stream.bindings.{beanName}-out-{idx}={topic} Установить экспорт из темы сообщения. По умолчанию тема и beanName То же имя.

Function По сравнению с производителем или потребителем, это больше похоже на обработку сообщения. Этот процесс может выполнять серию операций над сообщением, включая разделение сообщения, фильтрацию сообщений, расчет промежуточных результатов и т. д. Обычное использование — международный обмен сообщениями и уведомления на нескольких платформах.

Интернационализированные сообщения — это локализованные сообщения. Функция Это похоже на функцию переводчика, передающую переведенное сообщение потребителю.

Иногда нам также необходимо отправлять уведомления на несколько платформ одновременно, например, по электронной почте, текстовым сообщениям и т. д. Вообще говоря, почтовые серверы и серверы SMS не имеют жестко закодированных шаблонов сообщений для повышения универсальности. В настоящее время необходим посредник для обработки сообщений и внедрения их в соответствующие шаблоны платформы.

Несколько привязок вывода

О разделении сообщений говорилось выше, Функция Разрешить несколько topic из Отправка сообщения, будет использоваться в возвращаемом значении KStream Array, то конфигурация будет использоваться так, как показано только что. spring.cloud.stream.bindings.{beanName}-out-{idx}={topic},idx Это означает, что из — возвращаемое значение KStream существоватьв массивеизиндекс。

Привязка нескольких входов

Привязка нескольких входовсуществовать редко используется в обычных приложениях и обычно используется для распределенных вычислений. Например, для расчета деления требуются как делитель, так и делимое. Распределенные вычисления также SCS из Одно из великих применений,Слепое пятно знаний,существования Я не буду здесь много рассказывать.

KStream

Выше много раз упоминалось KStream,По сути, это последовательный и растущий набор данных.,Это своего рода поток данных.

KTable

KTable и KStream Похоже, но и KStream Разница в том, что он не позволяет key повторить. Столкнувшись с тем же самым key изdata, предпочтет обновить, а не вставить. KTable По сути, это тоже поток данных, и другие классы реализации также наследуют AbstractStream。 В какой-то момент вы можете увидеть его как KStream. из Последний снимок.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose