На платформах электронной коммерции правило 80/20 особенно очевидно: 20% продавцов с высокой стоимостью часто обеспечивают более 80% продаж. Эти продавцы обычно имеют большое количество заказов, продуктов, предложений и других потребностей управления, что вызывает у них острую потребность в функциях пакетной обработки. Пакетные операции могут помочь этим продавцам эффективно обрабатывать информацию о продуктах, управлять запасами и заказами, что значительно повышает эффективность работы.
Благодаря пакетным операциям продавцы могут модифицировать несколько продуктов за короткий период времени, например, единообразно корректировать цены, корректировать стратегии продвижения и т. д., чтобы быстро реагировать на изменения рынка и оптимизировать взаимодействие с пользователем. Кроме того, пакетные операции также снижают риск ручных ошибок, обеспечивают согласованность данных и позволяют продавцам больше сосредоточиться на стратегическом планировании и управлении взаимоотношениями с клиентами. Короче говоря, для этих торговцев пакетные операции являются не только ключевым инструментом повышения эффективности управления, но и важной гарантией роста бизнеса.
В серверной части продавца Dewu все пакетные операции продавцов выполняются в системе пакетной обработки (центр пакетной обработки). Продавцы могут выполнять пакетные операции, выполняя пакетный импорт или пакетный экспорт на странице функций. Обработанные файлы будут отображаться в центре загрузок.
Кроме того, центр пакетной обработки также выполняет задачи пакетных операций в нескольких областях, таких как серверная часть транзакций, обслуживание клиентов, Huijin и магазины. До сих пор центр пакетной обработки обслуживал тысячи пакетных задач в десяти доменах, обрабатывая десятки тысяч связанных задач и сотни миллионов связанных данных каждый день.
Поскольку объем получаемых объектов продолжает увеличиваться, системы пакетной обработки также постоянно развиваются. Проще говоря, системы пакетной обработки прошли несколько этапов развития: от децентрализации к объединению, централизации и изоляции. Далее, с точки зрения разработчика пакетной обработки Сяо Вана, мы представляем эти три конструкции систем пакетной обработки и обсуждаем их соответствующие характеристики и применимые сценарии.
Предположим, Сяо Ван получает запрос на пакетную операцию, требующий проведения пакетных торгов на серверной стороне продавца. Требования были очень простыми, и Сяо Ван выполнил основной процесс всего за два с половиной дня.
После того, как бизнес вышел в Интернет, отзывы продавцов были очень хорошими, и продукт немедленно запросил пакетное изменение ставок. Поэтому Сяо Ван скопировал тыкву и написал другой процесс.
Эти два процесса почти одинаковы, но Сяо Ван, одержимый программированием, сказал, что не может их принять. После анализа Сяо Ван обнаружил, что каким бы ни был процесс импорта, некоторые шаги всегда были исправлены, поэтому он решил повторно использовать код.
После повторного использования кода между торгами и модификациями различаются только проверка формата и бизнес-логика. Остальные загрузки файлов, анализ контента, сохранение и загрузка результатов используют один и тот же узел. Поскольку различия между различными предприятиями в основном касаются обработки данных, Сяо Ван решил напрямую открыть его как точку расширения. В различных бизнес-сценариях достаточно реализовать собственные расширения обработки данных, чтобы обеспечить беспрепятственный доступ к процессу пакетной обработки. Схематическая диаграмма расширения бизнеса выглядит следующим образом:
Во время конкретной реализации Сяо Ван выбирает точки расширения с помощью бизнес-идентичности в коде и может установить для этой цели соответствующий фабричный класс.
@Component
public class BpcProcessHandlerFactory {
@Autowired
private ApplicationContext applicationContext;
private static ConcurrentHashMap<String, BpcProcessDefine> templateMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
Map<String, ImportService> importServiceMap = applicationContext.getBeansOfType(ImportService.class);
for (ImportService importService : importServiceMap.values()) {
initImportService(importService);
}
}
private void initImportService(ImportService importService) {
// ...
}
public BpcProcessHandler getBpcProcessHandler(String templateCode) {
if (StringUtils.isBlank(templateCode)) {
return null;
}
if(!templateMap.containsKey(templateCode)) {
return null;
}
return templateMap.get(templateCode).newProcessHandler();
}
}
Для обработки импортированных задач упрощенный поток кода выглядит следующим образом:
@Service
public class BpcProcessService {
@Autowired
private BpcProcessHandlerFactory bpcProcessHandlerFactory;
public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
// Получите баллы расширения
BpcProcessHandler bpcProcessHandler = bpcProcessHandlerFactory.getBpcProcessHandler(req.getTaskTemplateCode());
if (bpcProcessHandler == null) {
throw new Bpc ProcessException("Определение шаблона не найдено");
}
// 1. Создание задач
createTask();
// 2. Загрузка файла && Сохранение файла
downloadFromOss();
// 3. Анализ данных
int loopCnt = 0;
int maxLoopCnt = bpcProcessHandler.getMaxLoopCnt();
while(loopCnt++ < maxLoopCnt) {
// Обработка точки расширения вызова
bpcProcessHandler.process();
// задача обновления
updateTaskProcess();
}
// задача обновления
updateTaskStatus();
return taskId;
}
}
Завершив точку расширения процесса, Сяо Ван подумал про себя, что теперь он может быть спокоен. Если в будущем появятся новые сценарии импорта, вам нужно будет только реализовать собственную логику проверки и логику обработки.
Но хорошие времена длились недолго. По мере того, как размер торговцев рос, Сяо Ван обнаружил, что к ним подключается все больше и больше предприятий; сначала это были торги, затем товары, а затем другие оптовые услуги по возврату и оплате услуг. справиться с этим в одиночку. Теперь мы можем только позволить каждому бизнесу развивать свой собственный бизнес в системе пакетной обработки. Привычки кодирования у всех разные, и все больше и больше Jars подключаются к системе пакетной обработки, и система превратилась в мешанину.
Как можно изменить эту ситуацию?
В централизованной архитектуре: все процессы бизнес-обработки являются общими, и разные предприятия дополняют логику каждого бизнеса, реализуя свои собственные точки расширения. Это приводит к наиболее очевидной проблеме: границы системы размыты, а деловая связь затруднена.
Можно ли записать эту точку расширения извне?
Сяо Вану внезапно пришла в голову идея: с SPI все будет в порядке. Механизм SPI Java может помочь нам реализовать каждый бизнес, поэтому системе пакетной обработки достаточно абстрагировать набор основных процессов импорта/экспорта на основе SPI. Поскольку каждая компания должна иметь возможность точно находить SPI, необходимо добавить определенные возможности бизнес-конфигурации.
По сравнению с централизованной архитектурой конфигурационное решение более масштабируемо, но оно также неизбежно имеет недостаток: разработчикам необходимо создавать конфигурации.
Конфигурация пакетной обработки должна содержать как минимум следующее содержимое:
Среди них стоимость обслуживания такой информации, как отношения сопоставления полей и SPI, относительно высока. Чтобы снизить рабочую нагрузку разработчиков, Сяо Ван также поддерживает плагин IDEA. Используется для загрузки конфигураций одним щелчком мыши.
Разработчики серверной части могут сообщать о своих собственных конфигурациях одним щелчком мыши по аннотациям, что значительно снижает нагрузку по загрузке бизнес-конфигураций.
Синхронное выполнение — общая обработка конфигурации
После создания конфигурации вы можете использовать обобщенный вызов dubbo для выполнения реализации каждого SPI:
@Override
public String invoke(ServiceDefinition serviceDefinition, Object inputParam) {
GenericService genericService = DubboConfig.buildService(serviceDefinition.getInterfaceName(), serviceDefinition.getTimeout());
//Обработка преобразования списка параметров, преобразование ключа параметра запроса во внутренние параметры
String[] parameterTypes = new String[] {serviceDefinition.getRequestType().getClassName()};
Object[] args = new Object[] {inputParam};
long startTime = System.currentTimeMillis();
Object result;
try {
log.info("invoke service={}#{} with request={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(args));
result = genericService.$invoke(serviceDefinition.getMethod(), parameterTypes, args);
long endTime = System.currentTimeMillis();
digestLog(serviceDefinition, true, endTime - startTime);
log.info("invoke service={}#{} with result={}", serviceDefinition.getInterfaceName(), serviceDefinition.getMethod(), JSON.toJSONString(result));
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
digestLog(serviceDefinition, false, endTime - startTime);
log.info("failed to dubbo invoke:" + serviceDefinition.getInterfaceName() + "#" +serviceDefinition.getMethod() + " with error " + ex.getMessage());
throw new DependencyException(ErrorCodeEnum.DEFAULT_DEPENDENCY_ERROR.getCode(), ex.getMessage(), ex);
}
if (result == null) {
throw new DependencyException(ErrorCodeEnum.DEFAULT_BIZ_ERROR.getCode(), "the result is null");
}
Map resultMap = JSON.parseObject(JSON.toJSONString(result), Map.class);
processError(resultMap);
Object data = resultMap.get("data");
return JSON.toJSONString(data);
}
Упрощенный вариант процесса выполнения выглядит следующим образом:
@Service
public class BpcProcessService {
@Autowired
private BpcProcessHandlerFactory bpcProcessHandlerFactory;
public String doBpcProcess(BpcProcessReq req) throws BpcProcessException {
// 1. Получить конфигурацию
TaskTemplate template = getTemplate();
// 2. Создание задач
Task task = createTask();
// 3. Загрузка файла && Сохранение файла
downloadFromOss();
// 4. Анализ данных
int loopCnt = 0;
int maxLoopCnt = template.getMaxLoopCnt();
while(loopCnt++ < maxLoopCnt) {
// Обработка вызова SPI
invoke(template, task)
// задача обновления
updateTaskProcess();
}
// задача обновления
updateTaskStatus();
return taskId;
}
}
Видно, что настроенная стратегия выполнения аналогична стратегии выполнения предыдущего расширения процесса. Основное изменение заключается в переключении с вызова локальной точки расширения на вызов настроенного SPI.
Планирование выполнения и корректировка бизнес-целей
После завершения настройки Сяо Ван вздохнул с облегчением. Система наконец-то стала чистой. Бизнес принадлежит бизнесу, а процесс принадлежит процессу. Они не мешают друг другу. Однако не все шло гладко, и вскоре система пакетной обработки выпустила дым. Проще говоря, этот дым вызван переполнением памяти, вызванным тем, что система пакетной обработки обрабатывает большое количество задач одновременно.
В ответ на этот дым Сяо Ван тщательно проанализировал системные данные и обнаружил, что бизнес торгового центра загрузки имеет свои собственные бизнес-характеристики:
Ниже приведены некоторые диаграммы источников данных, проанализированные Сяо Ваном:
Эти характеристики проявляются в системах пакетной обработки как:
Разве это не просто риск, вызванный ресурсами? Сяо Ван подумал, что это небольшой случай, поэтому он просто добавил текущий предел для его решения, а затем добавил текущий предел к задаче создания. В результате ситуация не только не улучшилась после выхода в Интернет, но и многие важные задачи импорта были «случайно прекращены» из-за текущего ограничения. После анализа Сяо Ван наконец нашел причину. В бизнес-центре загрузки торговцев ограничение тока не может соответствовать требованиям защиты ресурсов. На самом деле это определяется внутренней архитектурой самой системы, поскольку в большинстве случаев пакетная обработка — это система с низким использованием ЦП и высоким использованием памяти. Если текущий поток отправки задач ограничен, с одной стороны, легко случайно повредить основную задачу заказа/предложения, а с другой стороны, влияние трудоемких задач игнорируется. Как показано ниже:
Если вы не можете ограничить поток, вам придется сделать это самостоятельно. Пока у вас все под рукой, вы можете решить, когда задача будет выполнена. Поэтому Сяо Ван планировал сменить свою личность с пассивного исполнения на активное планирование. Другими словами, он переключается с синхронного процесса на асинхронный процесс планирования, а система сама занимается распределением ресурсов и изолирует бизнес. Сяо Ван быстро нарисовал свой основной процесс.
Процесс очень прост. При создании задачи она больше не выполняется напрямую, а ожидает системного планирования перед выполнением. Однако Сяо Ван снова столкнулся с трудностями в плане планирования и изоляции. Что им делать?
изоляция бизнеса
Изоляция в основном делится на две категории: физическую изоляцию и логическую изоляцию.
Физическая изоляция: разные машины выполняют планирование разных служб.
Логическая изоляция. Полная бизнес-изоляция за счет использования разных пулов потоков.
Сначала все просто, а потом сложно. Сяо Ван решил сначала использовать простой метод, чтобы изолировать бизнес. Метод пула потоков уже может разделить задачи, которые могут привести к потерям капитала, и задачи, которые не вызовут капитальных потерь. Что касается планирования, Сяо Ван перечислил распространенные в отрасли методы планирования приоритетов.
Планирование задач
1. Очередь приоритетов. Используйте очередь ожидания пула потоков для завершения планирования приоритетов.
Преимущества: Код прост и удобен в обслуживании. Вам нужно только поддерживать приоритетную очередь.
Недостатки: необходимо добавить дополнительное состояние для представления ожидания планирования. Существует проблема нехватки ресурсов и определенный риск для стабильности, поскольку отсутствует контроль над очередью ожидания пула потоков.
2. Стратегия старения. Используйте стратегию старения для динамического улучшения приоритета задач.
Преимущества: в значительной степени позволяет избежать проблемы нехватки ресурсов, имеет высокий приоритет масштабируемости, сильные возможности управления задачами и контроля, а также меньшее вмешательство в конечный автомат.
Недостатки: необходимо учитывать проблемы параллелизма, а код сложен.
3. Многоуровневая очередь: используйте многоуровневую очередь для выполнения приоритета задачи.
Преимущества: это в значительной степени позволяет избежать проблемы голодания, код относительно краток, возможности управления задачами сильны, а конечный автомат меняется меньше.
Недостатки: Приоритет задачи имеет плохую масштабируемость. При добавлении нового приоритета необходимо изменить код планирования. При отсутствии качественных задач пропускная способность системы снижается.
Объединив вышеупомянутые решения, Сяо Ван наконец применил метод многоуровневой очереди + изоляции потоков для планирования задач. В конкретной реализации планирования запланированные задачи используются для запуска процесса.
Кроме того, чтобы поддержать временное увеличение пропускной способности системы в сценариях с большими рабочими нагрузками, Сяо Ван также добавил возможность сегментирования. Принимая параметры сегментирования, каждая машина получает только свои собственные сегменты. Упрощенная версия кода выглядит следующим образом:
@Service
@Slf4j
public class TaskScheduleServiceImpl implements TaskScheduleService {
@Override
@LogAnnotation
public void schedule(int shared, int all) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// Потерянное выполнение пула потоков
List<Long> highTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.HIGH, all * arkConfig.highSize);
highTaskIds = highTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("Приоритетные задачи планирования, высококачественные задачи, которые необходимо выполнить Ids = {}", highTaskIds);
process(highTaskIds, (id) -> taskThreadPool.executeHigh(() -> process(id)));
// Потерянное выполнение пула потоков
List<Long> mediumTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.MEDIUM, all * arkConfig.mediumSize);
mediumTaskIds = mediumTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("Приоритетные задачи планирования, задачи среднего и высокого качества, которые необходимо выполнить Ids = {}", mediumTaskIds);
process(mediumTaskIds, (id) -> taskThreadPool.executeMedium(() -> process(id)));
// Потерянное выполнение пула потоков
List<Long> lowTaskIds = taskInstanceRepository.queryUnstartedTaskIdByPriority(TaskPriorityEnum.LOW, all * arkConfig.lowSize);
lowTaskIds = lowTaskIds.stream().filter((id) -> id % all == shared).collect(Collectors.toList());
log.info("Приоритетные задачи планирования, задачи низкого качества, подлежащие выполнению Ids = {}", lowTaskIds);
process(lowTaskIds, (id) -> taskThreadPool.executeLow(() -> process(id)));
log.info("Приоритетная задача планирования, выполнение завершено, cost = {}", stopWatch.getTime());
}
private void process(List<Long> idList, Consumer<Long> consumer) {
if (CollectionUtils.isEmpty(idList)) {
return;
}
for (Long id : idList) {
consumer.accept(id);
}
}
private void process(Long id) {
// Логика обработки задач. . .
}
}
Сделав это, Сяо Ван внезапно вспомнил, что тестовую среду еще нужно покрасить.
Поэтому в график была добавлена маршрутизация красильной среды.
На этот раз проблема стабильности системы наконец-то полностью решена. В будущем, когда в системе возникнут риски пропускной способности, ей нужно будет только динамически регулировать количество отзывов.
Выполнив все вышеперечисленное, Сяо Ван включил мониторинг APM и обнаружил, что использование памяти системы по-прежнему очень велико. Очевидно, что сложные бизнес-процессы вынесены наружу, но почему производительность по-прежнему средняя?
Сяо Ван задумался о недостатках нынешней пакетной обработки:
Кроме того, есть еще одна проблема, которая его больше всего беспокоит: много деловых запросов. Хотя многие предприятия подключены к его системе, они часто не могут найти причину ошибки в случае сбоя выполнения, что требует сотрудничества Сяо Вана в устранении неполадок. Как решить эти проблемы?
Сяо Ван решил вернуться к своей изначальной природе и своим корням. Центр пакетной обработки был создан для решения проблемы пакетного импорта и экспорта для торговцев. Его основная цель — помочь бизнес-платформе снизить затраты на анализ файлов, генерацию файлов, загрузку файлов и отображение страниц.
Обязательно ли эти проблемы требуют наличия системы для их поддержки? Анализ и генерация файлов фактически выполняются с помощью EasyExcel SDK, загрузка файлов осуществляется с помощью Oss SDK, а также есть функция отображения страниц, которая представляет собой очень легкую логику. Другими словами, вы можете делать все первые шаги в бизнес-системе. Создайте плагин пакетной обработки, чтобы реализовать большинство возможностей центра пакетной обработки. Система пакетной обработки используется только для демонстрации. Сяо Вану пришла в голову новая идея: поместить логику в SDK пакетной обработки, и для пакетной обработки потребуется поддерживать только одну или две машины для передачи логики отображения.
Общий архитектурный проект показан на рисунке ниже:
Основано на идее локализации: центр пакетной обработки подобен центральному узлу, а каждая бизнес-система служит его конечным узлом и должна только регулярно сообщать информацию, связанную с задачами. Система пакетной обработки отвечает только за отображение страниц и полностью отделена от бизнеса.
Локализация дает следующие очевидные преимущества:
Конечно, серебряной пули не существует. Локализация также неизбежно приносит некоторые недостатки:
После локализации,Центру пакетной обработки не требуется поддерживать бизнес-логику、Нет необходимости в Планировании задачи, степень детализации задач является наилучшей. Сяо Ван наконец смог хорошо выспаться.
Выше мы рассматриваем точку зрения обычного разработчика системы пакетной обработки.,Возвращены к трем этапам разработки пакетной системы продаж (в процессе локализации). эти три этапа,Он отражает процесс эволюции от толстого к тонкому, от объединения бизнеса к изоляции бизнеса. От локального к платформе к локальному,В мире существует довольно общая тенденция,Ощущение, что если мы надолго разлучимся, то должны объединиться, а если надолго объединимся, то должны разойтись. Среди этих трех методов нет абсолютных преимуществ или недостатков.,Вместо этого он постепенно развивается по мере изменения потребностей бизнеса.
На начальном этапе система имеет меньше функций, обычно только одну или две простые функции импорта и экспорта. В настоящее время использование расширения процесса является самым легким и гибким вариантом, который может быстро удовлетворить основные потребности торговцев. По мере роста объема бизнеса изоляция между системами становится все более важной. В это время введение регистрации конфигурации становится необходимой мерой для обеспечения автономности и стабильности между различными модулями.
После дальнейшей разработки первоначальная реализация трансформации платформы обычно использует метод синхронного вызова, но последующие требования к стабильности способствуют внедрению асинхронного планирования. Однако на более поздних этапах даже асинхронное планирование может столкнуться с недостаточной пропускной способностью системы. Таким образом, режим сообщения о статусе локального выполнения бизнес-систем постепенно стал лучшим выбором, который может эффективно улучшить скорость ответа и возможности обработки системы.
Поскольку бизнес продолжает развиваться, системы пакетной обработки торговцев неизбежно будут обновляться и совершенствоваться, чтобы адаптироваться к новым потребностям и задачам. В проектировании систем не существует универсального решения. Все итерации проектирования на самом деле являются отражением способности разработчиков встречать проблемы и решать их.
*Текст/Жигуй
Эта статья является оригинальной работой Dewu Technology. Перепечатка без разрешения Dewu Technology строго запрещена, в противном случае будет наложена юридическая ответственность в соответствии с законом.