[Redis в действии] Почему бы не использовать MQ? Используйте Redis в качестве очереди сообщений! ? Как использовать Redis в качестве очереди сообщений и расширенные принципы базовых принципов
[Redis в действии] Почему бы не использовать MQ? Используйте Redis в качестве очереди сообщений! ? Как использовать Redis в качестве очереди сообщений и расширенные принципы базовых принципов

Недавно один из бизнесов студии объединился с другим бизнесом. Естественно, использовался MQ (Message Queue), поэтому было очевидно, что RabbitMQ будет развернут на сервере.

Мы используем облачную службу, поэтому, конечно, мы развертываем промежуточное программное обеспечение в облачной службе. Служба активирована полностью, но когда приходит время отладки, мы не можем подключиться (говорят, что она напрямую подключена к облачной службе). интранет, но главное, чтобы все коллеги были онлайн) Для локального тестирования)

Я совершенно потерял дар речи. Что мне делать, столкнувшись с этой сценой? Бизнес будет продолжаться, и мы ждем доставки. Вот я и вспомнил стек технологий, который изучил ранее.

Redis также можно использовать в качестве очереди сообщений (но он используется редко, поэтому его нелегко запомнить или о нем никто не знает), поэтому операция зависает. Шаги проще, чем MQ. Давайте посмотрим, как это реализовать. .

Преимущества и недостатки Redis как очереди сообщений:

Использование Redis в качестве очереди сообщений имеет следующие преимущества по сравнению с использованием специализированной системы очередей сообщений (например, RabbitMQ, Kafka и т. д.):

  1. Просто и легко: Redis — это легкая и простая система хранения данных в памяти. Сравнительно специализированная очередь системы сообщений, используя Redis в свою очередь сообщений не требует введения дополнительных компонентов и зависимостей, что позволяет снизить сложность системы.
  2. Быстро: поскольку Redis хранится в памяти, он имеет очень высокую производительность чтения и записи. Это очень выгодно для приложений, которым требуется низкая задержка.
  3. Поддержка нескольких структур данных: Redis предоставляет множество структур данных, таких как списки, публикация/подписка, упорядоченные наборы и т. д. Это делает Redis более гибким при обработке различных типов сообщений и задач.
  4. Сохранение данных: Redis может обеспечить сохранение данных, сохраняя их на диск. Это означает, что даже если Redis будет перезапущен, предыдущие сообщения не будут потеряны.
  5. обширные сценарии Применение: Redis можно использовать не только в качестве Сообщения также могут использоваться для кеша, базы данных, распределенной блокировки и других целей. Если ваше приложение уже использует Redis, используйте Redis поочередно. сообщения могут снизить сложность стека технологий.

Недостатки также очевидны:

  1. Отсутствуют некоторые расширенные функции: по сравнению со специализированным поворотом сообщенийсистема,Redisсуществоватьочередь Функции с точки зрения сообщений могут быть относительно простыми. Например, в нем могут отсутствовать некоторые расширенные функции обмена сообщениями, такие как повторные попытки сообщений, маршрутизация сообщений, постоянные сообщения и т. д.
  2. Надежность и согласованность. Основная цель разработки Redis — обеспечить высокую производительность и низкую задержку, а не строгую согласованность и высокую надежность. В некоторых случаях Redis может потерять сообщения или не предоставить гарантии надежности в случае сбоя.

Сценарии применения:

Подходит для простых небольших и средних проектов. Если функция простая и количество посещений не большое, можно рассматривать. Если к вашему приложению предъявляются строгие требования к надежности и расширенной функциональности, а также необходимо обрабатывать большие объемы сообщений и сложную маршрутизацию сообщений, то использование специализированной системы очередей сообщений может быть более подходящим.

Redis реализует этапы реализации системы очереди сообщений:

Настройте Редис:

Сначала убедитесь, что вы правильно настроили зависимости Redis и Lettuce и создали объект LettuceConnectionFactory.

Язык кода:javascript
копировать
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
Язык кода:javascript
копировать
  redis:
    host: 
    port: 6379
    password: 
    lettuce:
      pool:
        max-active: 1000
        max-idle: 1000
        min-idle: 0
        time-between-eviction-runs: 10s
        max-wait: 10000

Создайте объект RedisTemplate и установите LettuceConnectionFactory в качестве его фабрики соединений:

Язык кода:javascript
копировать
 @Bean
    public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultSerializer(new StringRedisSerializer());
        return template;
    }

Установите сериализатор RedisTemplate. В очереди сообщений вы можете использовать сериализатор по умолчанию StringRedisSerializer, который хранит и передает сообщения в виде строк. Сериализатор по умолчанию можно установить с помощью следующего кода:

Язык кода:javascript
копировать
redisTemplate.setDefaultSerializer(new StringRedisSerializer());

Реализуйте функции публикации сообщений и подписки.

Опубликовать сообщение:

Язык кода:javascript
копировать
redisTemplate.convertAndSend("channel_name", "message_payload");

В приведенном выше коде «channel_name» — это имя канала сообщения, а «message_payload» — это содержимое публикуемого сообщения.

Подпишитесь на новости:

Сначала создайте класс реализации MessageListener для обработки полученных сообщений:

Язык кода:javascript
копировать
public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // Обрабатывать полученные сообщения
        String channel = new String(message.getChannel());
        String payload = new String(message.getBody());
        // Выполнять пользовательскую логику
    }
}

Создайте объект LettuceMessageListenerAdapter и предоставьте класс реализации MessageListener:

Язык кода:javascript
копировать
LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

Создайте объект RedisMessageListenerContainer и настройте его LettuceConnectionFactory и адаптер прослушивания:

Язык кода:javascript
копировать
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("Название канала"));
listenerContainer.start();

Выполнив описанные выше шаги, мы создали объект LettuceConnectionFactory для установления соединения с сервером Redis. Затем мы создали класс реализации MessageListener для обработки полученных сообщений. Затем мы создали объект LettuceMessageListenerAdapter и предоставили класс реализации MessageListener. Наконец, мы создаем объект RedisMessageListenerContainer и настраиваем его LettuceConnectionFactory и адаптер прослушивания, а затем запускаем контейнер, чтобы начать прослушивание сообщений на указанном канале.

Преимущество вышеуказанного решения в том, что вы можете четко знать, в какой части прослушиватель отслеживает информацию соответствующего канала. Однако в бизнесе, если для каждого бизнеса и канала соответствующего модуля установлен прослушиватель (мы. предположим, что каждому бизнесу необходимо получить сообщение. Логика выполнения в будущем будет другой) Тогда рабочая нагрузка резко увеличится.

Итак, есть второй способ записи:

Практика и совершенствование

Язык кода:javascript
копировать
/***
 * @title MessageManager
 * @author SUZE
 * @Date 2-17
 **/
@Component
public class ReservedMessageManager {
    private String ListenerId;
    private String UserId;
    private String message;
    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        subscribeToChannel("reserved");
    }
    @Resource
    private SmsServer smsServer;

    public void publishMessage(String channel, reserveMessage message) {
        String  Message=serialize(message);
        redisTemplate.convertAndSend("channel_name", "message_payload");
        redisTemplate.convertAndSend(channel, Message);
    }
    // Событие срабатывает при получении сообщения
    private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
        if (reserveMessage != null) {
            String userId = reserveMessage.getUserId();
            String ListenerId=reserveMessage.getListenerId();
            String message = reserveMessage.getMessage();
            //TODO Обрабатывать полученные сообщениялогика Здесь нам нужно провести проверку сообщения позже. Есть четыре состояния: согласие, отказ и окончание. Состояние мышления отличается от отправки контента
            switch (message){
                //TODO Сообщение должно быть отправлено обеим сторонам. Поэтому мне нужно отправить две копии Копирайтинг для отправки сообщений
                case "wait":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "agree":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "refuse":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "over":
                    //Здесь нам нужно управлять файловой системой

                    //Если вы откажетесь Тогда нам нужно следить за этим
                    smsServer.sendSms(userId,ListenerId,message);
                    break;

            }
            //smsServer.sendSms(userId,ListenerId,message);
            // Другая логика обработки...
        }
    }

    public void subscribeToChannel(String channel) {
        redisTemplate.execute((RedisCallback<Object>) (connection) -> {
            connection.subscribe((message, pattern) -> {
                String channelName = new String(message.getChannel());
                byte[] body = message.getBody();
                // Разобрать полученное сообщение
                switch (channelName){
                    case "reserved":
                        reserveMessage reserveMessage = deserializeMessage(new String(body));
                        handleReserveMessage(channelName, reserveMessage);
                        break;
                    //Есть и другие каналы Например, отказ – это Запретить канал Специально выслушайте причины отказа
                }
            },канал.getBytes());
            вернуть ноль;
        });
    }
    // сериализация
    частный ReserveMessage deserializeMessage (тело строки) {
        ObjectMapper objectMapper = новый ObjectMapper();
        пытаться {
            вернуть objectMapper.readValue(body, reserveMessage.class);
        } catch (IOException e) {
            // Обработка исключений, связанных с сериализацией
            e.printStackTrace();
            return null;
        }
    }

    // сериализация
    public String serialize(reserveMessage reserveMessage) throws SerializationException {
        if (reserveMessage == null) {
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString(reserveMessage);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }

}
Объяснение кода

  1. subscribeToChannelМетод принимаетchannelпараметр,Используется для указания имени канала для подписки.
  2. redisTemplate.executeметод выполненияRedisдействовать,и пройти вRedisCallbackфункция обратного вызова。
  3. Функция обратного вызова реализована в виде лямбда-выражения,принятьconnectionпараметр,Представляет подключение к Redis.
  4. в функции обратного вызова,вызовconnection.subscribeСпособ подписки на канал。Должен Метод принимаетфункция обратного вызоваделатьдляпараметр,используется для Обрабатывать полученные сообщения。
  5. В функции обратного вызова сообщения,Сначала изmessageПолучите имя канала и тело сообщения из объекта.。
  6. использоватьnew String(message.getChannel())Преобразовать имя канала в строковый тип,и хранитьсуществоватьchannelNameв переменной。
  7. использоватьmessage.getBody()Получите представление массива байтов тела сообщения.,и хранитьсуществоватьbodyв переменной。
  8. существоватьswitchв предложении,В зависимости от имени канала выполняется различная обработка. В этом примере,Обрабатываются только «зарезервированные» каналы.
  9. Обработка «зарезервированных» каналов,вызовdeserializeMessageметод возвращает тело сообщениясериализациядляreserveMessageобъект,и сохранить егосуществоватьимядляreserveMessageчастьв переменной。
  10. вызовhandleReserveMessageметод,Преобразуйте имя канала и обратноесериализацияпозжеreserveMessageобъектделатьдляпараметр Процесс。
  11. handleReserveMessageметодиспользуется для Обработка полученных сохраненных сообщенийлогика。Он проверяет тип сообщения,и выполнять различные операции в зависимости от типа. По типу сообщения,этовызовsmsServer.sendSmsметодназначенномуuserIdиlistenerIdотправка текстового сообщения。
Я централизовал систему обработки сообщений, а это значит, что эта система мониторинга может отслеживать все виды бизнеса зарезервированного канала. Я перечислил четыре типа ожидания, согласия, отказа и так далее. Но если это более крупная бизнес-система, тот же канал. может столкнуться с большим количеством возможных ветвей. Если вы будете следовать первому набору решений и вам потребуется реализовать прослушиватель для каждого конкретного бизнеса, рабочая нагрузка будет огромной (возможно, таким образом связанность будет ниже).
Но если я централизованно обрабатываю сообщения таким образом, то система отправки СМС будет заниматься только отправкой СМС-сообщений, а система хх — только соответствующей работой, что может сильно снизить степень связанности в работе.
Тогда все должны были заметить мои два метода, отвечающие за сериализацию и десериализацию. Это связано с тем, что бизнесу необходимо инкапсулировать объект в класс, поэтому решение здесь — настроить решение для сериализации на процессоре информационного центра (если вы сделаете это лучше. , вы действительно можете извлечь этот сериализатор и инкапсулировать его в абстрактный метод и использовать дженерики для определения возвращаемых результатов и параметров, чтобы все ссылочные типы можно было сериализовать)

Возникшие проблемы:

Кстати, столкнулся с такой ошибкой в ​​середине

Ошибка: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: невозможно создать экземпляр TopOne.MessageSystem.entity.reserveMessage (не существует создателей, таких как конструктор по умолчанию): невозможно десериализовать из значения объекта (нет делегата или свойства). -основанный Создатель)

Причины и анализ:

В классе ReserveMessage отсутствует конструктор по умолчанию, что не позволяет библиотеке Джексона создавать экземпляры этого класса. В сообщении об ошибке упоминается следующее: «Невозможно создать экземпляр TopOne.MessageSystem.entity.reserveMessage (не существует создателей, таких как конструктор по умолчанию)». Чтобы Джексон мог правильно десериализовать объекты, в класс ReserveMessage необходимо добавить конструктор по умолчанию. Конструктор по умолчанию — это конструктор без параметров, который не требует никаких параметров для создания объекта. В вашем классе ReserveMessage

Это модифицированный класс инкапсуляции:

Язык кода:javascript
копировать
@Data
public class reserveMessage {
    private String UserId;
    private String ListenerId;
    private String message;


    public reserveMessage() {
        // конструктор по умолчанию
    }
    public reserveMessage(String userId, String ListenerId,String message) {
        this.UserId = userId;
        this.ListenerId = ListenerId;
        this.message=message;
    }


}

Тестирование в реальном бизнесе

опубликовать сервис

Абонентское обслуживание (услуга прослушивания)

Результаты испытаний

успех

Печать здесь заменяет рассылку СМС в оригинальном бизнесе, что можно расценивать как успех.

На этом все. Спасибо за просмотр.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose