Поскольку появляется все больше бизнес-направлений и систем, потребность в синхронизации данных между системами или предприятиями становится все более частой. Большинство современных бизнес-систем Интернета используют решения для хранения и обработки данных MySQL:
Поэтому компаниям срочно необходимо гибкое и простое в использовании решение для синхронизации и обработки данных между системами, чтобы конкретные бизнес-данные можно было легко передавать между другими предприятиями или компонентами, а также способствовать быстрой бизнес-итерации.
В настоящее время наиболее распространенными решениями для синхронизации системных данных в отрасли являются синхронная двойная запись, асинхронная двойная запись, прослушивание бинлога и т. д., каждое из которых имеет свои преимущества и недостатки. В этой статье объясняется случай синхронизации MySQL с ES.
Самое простое решение — записать данные в ES при записи данных в MySQL, чтобы добиться двойной записи данных.
Добавьте MQ в основу синхронной двойной записи, чтобы добиться асинхронной записи.
На основе второго решения в основном решается проблема бизнес-связанности, поэтому вводится автоматический механизм мониторинга и обработки изменений данных.
При разработке базовых компонентов в основном учитывается отсутствие вторжения в бизнес, отсутствие восприятия бизнес-доступа и низкая связанность системы. Таким образом, третий вариант выбора также учитывает, что это решение имеет недостатки в плане повторного использования и масштабируемости, поэтому на этом основании. , оптимизация завершена.
Все необходимые источники данных — это MySQL, поэтому сначала рассмотрите возможность выбора компонентов для мониторинга изменений данных MySQL в режиме реального времени. Наиболее известным зрелым решением в отрасли является [canal], которое отвечает требованиям функциональной полноты, активности сообщества и стабильности. Таким образом, третье решение оптимизировано на основе канала для обеспечения синхронизации данных в нескольких системах и достижения разделения бизнеса, возможности повторного использования и масштабируемости.
Через единую «службу рассылки сообщений» реализуется соединение с Canal Client, и сообщения распределяются по различным кластерам MQ в едином формате. Единая «служба потребления сообщений» используется для потребления сообщений и обратного вызова компании. Интерфейс Бизнес-системы не нужно уделять внимание потоку данных, необходимо уделять внимание только обработке и сборке данных для конкретных предприятий.
«Служба рассылки сообщений» и «Служба потребления сообщений» реализуют повторное использование функций в процессе потока данных для каждого бизнес-направления. «Службу потребления сообщений» можно распределить по различным кластерам MQ, а конфигурация «Службы потребления сообщений» определяет выходные данные источника данных для достижения функционального расширения.
Подписка на данные и потребление данных разделены посредством MQ. «Служба распространения сообщений о подписке на данные» отвечает за стыковку с Canal Client, анализ сообщений об изменении данных, преобразование их в широко используемые сообщения сообщений формата JSON и распространение их клиенту в соответствии с бизнесом. Различные правила конфигурации MQ и маршрутизация.
Canal в основном основан на инкрементном анализе журналов базы данных MySQL и обеспечивает инкрементную подписку и потребление данных:
Из конструкции высокой доступности службы Canal видно, что при запуске нескольких экземпляров Canal Client гарантируется, что только один экземпляр работает и потребляет сообщения binlog. «Служба распространения сообщений по подписке на данные», на которой размещен клиент Canal, будет развернута на нескольких серверах. Поскольку каждый сервер запускается в разное время при выпуске службы, все активные экземпляры клиента Canal будут запускаться на сервере, который запускается первым, и потреблять ресурсы. сообщения бинлога.
Клиенты Canal, работающие на остальных серверах, находятся в состоянии ожидания и не могут полностью использовать ресурсы каждого сервера. Поэтому мы надеемся, что разные места назначения будут выделены для выполнения разным серверам, но когда сервер выйдет из строя, выполнение будет автоматически передано на другие серверы, чтобы в полной мере использовать каждый сервер и обеспечить производительность потребления сообщений binlog.
С этой целью вводится компонент elasticjob-lite, а функция сегментирования используется для вторичной инкапсуляции для прослушивания событий изменения места назначения, происходящих в сети и в автономном режиме на определенном сервере.
Концепция элементов сегментов задач в ElasticJob позволяет запускать задачи в распределенной среде, и каждый сервер задач запускает только сегменты, назначенные этому серверу. При добавлении или отключении серверов ElasticJob будет отслеживать изменения количества серверов практически в реальном времени.
Если задание разделено на 4 слайса и выполняется на двух серверах, каждому серверу будет назначено 2 слайса:
Когда добавляется новый сервер заданий, ElasticJob распознает новый сервер через изменения во временных узлах в центре регистрации и повторно сегментирует его во время следующего планирования задач. Новый сервер будет нести часть фрагментов заданий:
Когда сервер заданий выйдет из строя во время работы, центр регистрации также обнаружит это через временные узлы и перенесет осколки на выжившие серверы во время следующего запуска для достижения высокой доступности заданий. Задания, которые не были выполнены из-за простоя сервера, могут продолжать выполняться посредством аварийного переключения.
Пользователями этой системы являются все бизнес-направления компании. Как сделать так, чтобы после онлайн-проблем каждый бизнес не влиял друг на друга.
Изоляция бизнес-ресурсов поддерживается как на уровне кластера MQ, так и на уровне очереди; сообщения об изменениях, полученные из канала, распределяются по различным кластерам MQ в соответствии с правилами, а унифицированные правила ключей маршрутизации устанавливаются таким образом, что каждое предприятие может подать заявку на MQ самостоятельно. бизнес во время стыковки, привязанная по требованию, соответствующая кластеру MQ и маршрутизации сообщений.
Путем настройки различных назначений для сопоставления с разными кластерами MQ и ZK можно добиться горизонтального расширения производительности.
После того, как канал получает сообщение из binlog, он разбивает пакетное сообщение на одно сообщение, выполняет расчет правила фрагментации и отправляет его назначенному коммутатору RabbitMQ и ключу маршрутизации, чтобы его можно было привязать к разным очередям в соответствии с различными бизнес-правилами в соответствии с для различных бизнес-сценариев и использования. Служба выполняет обработку потребления сообщений и создает обмен с именем «exchange.canal» с типом Тема, правила ключа маршрутизации: key.canal.{destination}.{database}.{table}.{sharding}, сегментирование по модулю сегментирования хеш-кода, отсортированного по значению pkName, соглашение о правилах именования очередей:queue.canal.{appId}. {bizName}, например:
queue.canal.trade_center.order_search.0 обязательность key.canal.dev-instance.trade_order.order_item.0
queue.canal.trade_center.order_search.0 обязательность key.canal.dev-instance.trade_order.order_extend.0
...
Чтобы отделить потребление сообщений от бизнес-системы, была независимо создана «служба потребления подписки на данные». Используйте сообщение MQ об изменении данных, доставленное из «Службы распространения сообщений о подписке на данные», и выполните обратный вызов указанного бизнес-интерфейса обратного вызова в соответствии с бизнес-конфигурацией. Бизнес-интерфейс обратного вызова отвечает за получение сообщений об изменении данных, сбор информации о документе ES, которую необходимо выполнить, и возврат ее потребительской службе для операций с данными ES.
Существует три типа операций для сообщений, подписанных из binlog: INSERT, UPDATE и DELETE. Здесь добавлена новая инструкция SELECT. Функция заключается в том, что после получения инструкции бизнес-интерфейс обратного вызова повторно получает последние данные из базы данных и. собирает его в документ ЭП, который необходимо выполнить. Информация возвращается сервису-потребителю для операций с данными ЭП.
В основном используется при полной синхронизации, частичной синхронизации, обновлении документа, компенсации сообщений и других сценариях.
Когда новая бизнес-функция подключается к сети, соответствующая очередь будет настроена для привязки соответствующих ключей маршрутизации и подписки на сообщения об изменении данных, требуемые бизнес-сценарием. Чтобы избежать необходимости повторного обновления кода службы потребления и повторной публикации службы каждый раз при обращении к новому бизнесу, необходимо регулярно загружать данные таблицы конфигурации и динамически добавлять функцию прослушивания очереди MQ.
Используйте контейнер SimpleMessageListenerContainer для настройки динамического прослушивания очереди потребления. Создайте экземпляр SimpleMessageListenerContainer для каждого кластера MQ и динамически зарегистрируйте его в контейнере Spring.
Бизнес обычно соответствует индексу ES и одной или нескольким очередям MQ (правила привязки ключей маршрутизации очередей см.: Правила фрагментации сообщений MQ):
Очередь имеет несколько потребителей, использующих ее. Поскольку нет никакой гарантии, что потребитель, который первым прочитает сообщение, завершит операцию первым, порядок может быть нарушен. Потому что разные сообщения отправляются в очередь, а затем несколько потребителей потребляют сообщения из одной очереди. С этой целью можно создать несколько очередей, и каждый потребитель использует только одну очередь. Производитель помещает сообщение в одну и ту же очередь в соответствии с правилами (см.: 3.4.4.2 Правила фрагментации сообщений MQ), так что одно и то же сообщение будет только. быть отправлены в одну и ту же очередь. Потребитель потребляет последовательно.
Службы обычно развертываются в кластерах, и, естественно, каждая очередь имеет несколько потребителей. Чтобы решить эту проблему, в сегмент MQ добавлен elasticjob-lite. Если имеется 2 экземпляра службы и 5 очередей, экземпляр 1 может использовать очереди 1, 2 и 3, а экземпляр 2 — очереди 4 и 5. При зависании одного из инстансов 1 потребление очередей 1, 2 и 3 будет автоматически передано инстансу 2. При перезапуске инстанса 1 потребление очередей 1, 2 и 3 снова будет передано инстансу 1. .
Причина, по которой порядок потребления RabbitMQ не в порядке, обычно заключается в том, что потребление очереди представляет собой многопоточное потребление одной машины или потребитель развернут в кластере. Поскольку разные сообщения отправляются в одну и ту же очередь, несколько потребителей потребляют сообщения из одной и той же очереди. . Например, потребитель A выполняет добавление, потребитель B выполняет модификацию, а потребитель C выполняет удаление. Однако потребитель C выполняется быстрее, чем потребитель B, а потребитель B быстрее, чем потребитель A. В результате порядок выполнения binlog потребления изменяется на ES. беспорядочно то, что должно было быть добавлено, изменено и удалено, стало удаленным, измененным и добавленным.
В связи с этим для RabbitMQ можно создать несколько очередей. Каждый потребитель потребляет сообщения из одной очереди в одном потоке. Когда производитель отправляет сообщение, сообщения с одинаковым номером отправляются в одну и ту же очередь. порядке сообщения с одинаковым номером будут потребляться только одним потребителем последовательно, обеспечивая таким образом последовательность сообщений:
Но как гарантировать, что в режиме кластера очередь выполняет однопоточное потребление только на одной машине, и как выполнить аварийное переключение, если эта машина выйдет из строя. В этом отношении в сегмент MQ добавлен elasticjob-lite. Если имеется 2 экземпляра службы и 5 очередей, мы можем позволить экземпляру 1 использовать очереди 1, 2 и 3, а экземпляр 2 — очереди 4 и 5. При зависании одного из инстансов 1 потребление очередей 1, 2 и 3 будет автоматически передано инстансу 2. При перезапуске инстанса 1 потребление очередей 1, 2 и 3 снова будет передано инстансу 1. .
В бизнес-сценариях, чувствительных к последовательному потреблению сообщений, можно использовать сегментирование очереди для улучшения общего параллелизма. Бизнес-сценарии, которые не чувствительны к использованию последовательности сообщений, также можно настроить для использования кластером в определенной очереди или одновременного использования на одном компьютере. Разумно выбирайте разные решения конфигурации для разных бизнес-сценариев, чтобы повысить общую производительность.
Сообщения об изменениях, полученные через Canal, могут соответствовать только бизнес-сценарию данных добавочной подписки. Однако нам обычно необходимо синхронизировать полный объем исторических данных, прежде чем добавочная подписка на данные сможет иметь смысл. Если идентификатор таблицы бизнес-данных находится в режиме автоматического увеличения, вы можете указать минимальное значение идентификатора и максимальное значение идентификатора, а затем разрезать их, например на 100 фрагментов, чтобы сгенерировать сообщение MQ и отправить его в MQ. После использования сообщений MQ соберите сообщения, сгенерируйте пакеты сообщений, которые имитируют добавочные изменения данных, и синхронизируйте данные, используя исходный метод обратного вызова добавочных сообщений.
Иногда нам необходимо восстановить указанные данные или идентификатор бизнес-таблицы не находится в режиме автоинкремента, и требуется полная синхронизация. Вы можете указать набор списков идентификаторов, которые необходимо синхронизировать через частично синхронизированный интерфейс, генерировать фрагментированные сообщения MQ и отправлять их в MQ. После получения синхронного сообщения MQ служба потребителя собирает сообщение, генерирует сообщение сообщения, которое имитирует добавочные изменения данных, и синхронизирует данные, используя исходный метод обратного вызова добавочного сообщения.
Когда в нашем индексе ES имеются большие пакеты аномалий данных и нам необходимо обновить данные индекса ES, мы можем сгенерировать задачу полной синхронизации, получить список идентификаторов документов указанного индекса ES на страницах, смоделировать генерацию сообщения частичной синхронизации. сообщения и отправлять их в MQ middle. После использования сообщений MQ соберите сообщения, сгенерируйте пакеты сообщений, которые имитируют добавочные изменения данных, и синхронизируйте данные, используя исходный метод обратного вызова добавочных сообщений.
Сохраняйте сообщения об ошибках синхронизации в таблице повторов сообщений и выполняйте компенсацию с помощью задания для облегчения мониторинга. Во время компенсации сообщение сбрасывается на сообщение MQ типа SELECT. После получения сообщения интерфейс обратного бизнес-обратного вызова получит последние данные из базы данных и обновит документ ES.
В настоящее время клиентом, официально рекомендованным ES, является RestHighLevelClient. На основании этого мы провели разработку вторичной упаковки, в основном учитывая масштабируемость и простоту использования.
Для некоторых бизнес-сценариев, требующих изоляции данных, мы предоставляем подключаемый модуль изоляции данных ES. В ES SDK спроектирован интерфейс поискового фильтра, который использует перехватчики для перехвата и фильтрации параметров условий поиска статистических документов, документов поиска и других методов.
/**
* Фильтры поиска
*/
public interface SearchSourceBuilderFilter {
String getFilterName();
void filter(SearchSourceBuilder searchSourceBuilder);
}
Если вы обнаружите, что Canal Client не может получать сообщения binlog в течение длительного времени, вы можете перейти к фону Canal Admin, чтобы проверить журналы в управлении экземплярами. Существует высокая вероятность того, что появится сообщение «не удалось найти имя первого файла журнала в индексном файле двоичного журнала». Это связано с тем, что информация binlog кэшируется в кластере zk, что приводит к неправильному получению данных. Это включает в себя определение позиции binlog. но это неправильно после запуска службы. Причина та же.
решать:
Обновление по запросу ES соответствует оператору набора обновлений... где... реляционной базы данных; эта команда не очень хорошо поддерживается в ES и может вызвать такие проблемы, как: выполнение нетранзакционного режима во время пакетных обновлений (допуск некоторых успешны, а некоторые терпят неудачу), при больших пакетных операциях истечет время ожидания, частые обновления будут сообщать об ошибках (конфликты версий), а автоматические выключатели будут срабатывать, когда сценарии выполняются слишком часто, и т. д. Наше решение также относительно простое. Мы напрямую отказываемся от использования метода updateByQuery в производственной среде и настраиваем его на использование режима сначала запроса данных, соответствующих условиям, а затем их распределения в MQ для отдельных обновлений.
Подписывайтесь на меня и следите за этой серией рубрик, мы продолжим следующую!
Об авторе: Технический эксперт и архитектор государственного предприятия в Шанхае. Имеет опыт исследований и разработок серверной части и архитектуры многих крупных производителей. Он отвечает за модульные, сервис-ориентированные и платформенные исследования и разработки чрезвычайно сложных бизнес-систем. . Он имеет богатый опыт руководства командами, а также глубокие знания по выявлению и обучению талантов.
ссылка: