SpringBoot + Disruptor реализует чрезвычайно быструю и высокопараллельную обработку, поддерживая 6 миллионов заказов в секунду без какого-либо давления!
SpringBoot + Disruptor реализует чрезвычайно быструю и высокопараллельную обработку, поддерживая 6 миллионов заказов в секунду без какого-либо давления!

Привет всем, я брат Лэй.

1. Предыстория

На работе я столкнулся с проектом, который использовал Disruptor в качестве очереди сообщений. Вы правильно прочитали, это не Kafka и не RabbitMQ. Самое большое преимущество Disruptor в том, что он быстрый и имеет открытый исходный код.

2. Введение в Disruptor

1、 Disruptor — это высокопроизводительная очередь, разработанная LMAX, британской компанией, занимающейся торговлей иностранной валютой. Первоначальной целью исследований и разработок было решение проблемы задержки очереди памяти (в тесте производительности выяснилось, что она находится в очереди). того же порядка, что и операция ввода-вывода). Система, разработанная на основе Disruptor, может поддерживать однопоточную обработку. Имея 6 миллионов операций в секунду, она привлекла внимание отрасли после выступления на QCon в 2010 году; 2、 Disruptor — это платформа Java с открытым исходным кодом, предназначенная для достижения максимально возможной пропускной способности (TPS) и минимально возможной задержки при решении проблемы производитель-потребитель (PCP); 3、 С функциональной точки зрения Disruptor реализует функцию «очереди», и это ограниченная очередь, поэтому сценарий его применения, естественно, является сценарием применения модели «производитель-потребитель»; 4、 Disruptor является ключевым компонентом онлайн-торговой платформы LMAX. Платформа LMAX использует эту структуру для обработки заказов со скоростью 6 миллионов TPS. Помимо финансовой сферы, Disruptor может использоваться и в других общих приложениях. Улучшения производительности; 5、 Фактически, Disruptor — это не столько платформа, сколько идея дизайна. Для программ с такими элементами, как «параллелизм, буферы, модели производитель-потребитель и обработка транзакций», Disruptor предлагает существенные решения для повышения производительности (TPS); 6、 Домашняя страница Disruptor на GitHub: https://github.com/LMAX-Exchange/disruptor;

3. Основная концепция Disruptor

Давайте начнем с понимания основных концепций Disruptor, чтобы понять, как он работает. Представленная ниже концептуальная модель является одновременно объектом предметной области и основным объектом, сопоставленным с реализацией кода.

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

1. Ring Buffer

Как следует из названия, это кольцевой буфер. RingBuffer раньше был основным объектом Disruptor, но, начиная с версии 3.0, его обязанности были упрощены и теперь он отвечает только за хранение и обновление данных (событий), которыми обмениваются через Disruptor. В некоторых более сложных сценариях приложений кольцевой буфер можно полностью заменить пользовательской реализацией.

2. Sequence Disruptor

Обмениваемые через него данные (события) нумеруются и управляются посредством последовательного возрастания порядковых номеров, причем обработка данных (событий) всегда производится инкрементально по порядковому номеру. Последовательность используется для отслеживания хода обработки определенного обработчика событий (RingBuffer/Consumer). Хотя AtomicLong также можно использовать для определения прогресса, определение Sequence как ответственного за эту проблему имеет еще одну цель: предотвратить проблему ложного совместного использования кэша ЦП (Flase Sharing) между различными последовательностями. (Примечание: это один из ключевых моментов Disruptor для достижения высокой производительности. В Интернете уже есть много представлений о проблеме псевдосовместного использования, поэтому я не буду здесь вдаваться в подробности).

3. Sequencer

Sequencer — это истинное сердце Disruptor. Этот интерфейс имеет два класса реализации: SingleProducerSequencer и MultiProducerSequencer, которые определяют алгоритмы параллелизма для быстрой и правильной передачи данных между производителями и потребителями.

4. Sequence Barrier

Используется для хранения ссылок на основную опубликованную последовательность RingBuffer и последовательности других потребителей, от которых зависит потребитель. Барьер последовательности также определяет логику, которая определяет, есть ли у потребителя дополнительные события для обработки.

5. Wait Strategy

Определите стратегию того, как Потребитель ждет следующего события. (Примечание: Disruptor определяет множество различных стратегий, обеспечивающих разную производительность для разных сценариев)

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

6. Event

В семантике Disruptor данные, которыми обмениваются производители и потребители, называются событиями. Это не конкретный тип, определенный Disruptor, а определяемый и указанный пользователем Disruptor.

7. EventProcessor

EventProcessor хранит последовательность конкретного потребителя (Consumer) и предоставляет цикл событий (Event Loop) для вызова реализации обработки событий.

8. EventHandler

Интерфейс обработки событий, определенный Disruptor, реализуется пользователями и используется для обработки событий. Это реальная реализация Consumer.

9. Producer

То есть производитель обычно ссылается на пользовательский код, который вызывает Disruptor для публикации событий. Disruptor не определяет конкретный интерфейс или тип.

4. Кейс-демо

Выполнив следующие 8 шагов, вы можете получить Disruptor Get home:

1. Добавьте зависимость pom.xml

Язык кода:javascript
копировать
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

2. Модель тела сообщения

Язык кода:javascript
копировать
/**
 * тело сообщения
 */
@Data
public class MessageModel {
     
       
    private String message;
}

3. Создайте EventFactory

Язык кода:javascript
копировать
public class HelloEventFactory implements EventFactory<MessageModel> {
     
       
    @Override
    public MessageModel newInstance() {
     
       
        return new MessageModel();
    }
}

4. Создайте EventHandler-потребитель

Язык кода:javascript
копировать
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
     
       
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
     
       
        try {
     
       
            //Остановимся здесь на 1000 мсда, чтобы подтвердить, что сообщение потребления да асинхронно
            Thread.sleep(1000);
            log.info("Начинается обработка сообщения потребителя");
            if (event != null) {
     
       
                log.info("потребитель Информация о потреблении да: {}", event);
            }
        } catch (Exception e) {
     
       
            log.info("потребителю не удалось обработать сообщение");
        }
        log.info("Обработка сообщения потребителя завершена");
    }
}

5. Создайте BeanManager

Язык кода:javascript
копировать
/**
 * Получить экземпляр объекта
 */
@Component
public class BeanManager implements ApplicationContextAware {
     
       

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
     
       
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
     
        return applicationContext; }

    public static Object getBean(String name) {
     
       
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
     
       
        return applicationContext.getBean(clazz);
    }
}

6. Создайте MQManager

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

Язык кода:javascript
копировать
@Configuration
public class MQManager {
     
       

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
     
       
        //Определяем пул потоков, используемый для обработки событий, Disruptor запускает обработку потребителя через поток, предоставляемый java.util.concurrent.ExecutorSerivce.
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //Указываем фабрику событий
        HelloEventFactory factory = new HelloEventFactory();

        //Укажите размер байта кольцевого буфера, который должен быть равен 2 в N-й степени (он может преобразовать операцию по модулю в битовую операцию для повышения эффективности), в противном случае это повлияет на эффективность
        int bufferSize = 1024 * 256;

        //Однопоточный режим для повышения производительности
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //Установка события бизнес-процессора ---потребитель
        disruptor.handleEventsWith(new HelloEventHandler());

        // Начать ветку дезинтегратора
        disruptor.start();

        //Получаем кольцо кольцевого буфера, используемое для получения события, созданного производителем
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

7. Создайте Mqservice и создайте класс-производитель реализации.

Язык кода:javascript
копировать
public interface DisruptorMqService {
     
       

    /**
     * информация
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
     
       

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;
    @Override
    public void sayHelloMq(String message) {
     
       
        log.info("record the message: {}",message);
        //Получаем индекс следующего слота Event
        long sequence = messageModelRingBuffer.next();
        try {
     
       
            //Заполняем событие данными
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("ПрошлоеInformationCheredДобавить Информация:{}", event);
        } catch (Exception e) {
     
       
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
     
       
            // Публикуем событие, активируем наблюдателя для потребления и передаем последовательность пользователю изменения
            //Обратите внимание, что последний метод публикации должен быть помещен наконец, чтобы гарантировать, что он должен быть вызван; если последовательность запроса не отправлена, он заблокирует последующие операции публикации или других производителей.
            messageModelRingBuffer.publish(sequence);
        }
    }
}

8. Создание тестовых классов и методов

Язык кода:javascript
копировать
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {
     
       

    @Autowired
    private DisruptorMqService disruptorMqService;
    /**
     * Disruptor используется внутри проекта для выполнения информацииочереди.
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
     
       
        disruptorMqService.sayHelloMq("информацияприезжать,Hello world!");
        log.info("Информацияочередь отправлена");
        //Остановимся здесь на 2000 мсда, чтобы убедиться, что да обрабатывает информацию асинхронно
        Thread.sleep(2000);
    }
}

Результаты тестового запуска

Язык кода:javascript
копировать
2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: информацияприезжать,Hello world!
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : Прошлоеинформацияочередь Добавитьинформация:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : информация очередьбыла отправлена
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительиметь дело синформацияначинать
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительинформация о потреблениида:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительиметь дело синформация Заканчивать

5. Резюме

На самом деле генератор -> потребитель Шаблон очень распространен, и описанных выше эффектов можно легко достичь с помощью некоторых очередей сообщений. Разница в том, что Disruptor Он реализован в виде очереди в памяти и не блокируется. Это также Disruptor Причина, почему это эффективно.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose