Длинная статья в 10 000 слов: лучшие практики многопоточного сращивания (большая широкая таблица) на базе Apache Hudi + Flink
Длинная статья в 10 000 слов: лучшие практики многопоточного сращивания (большая широкая таблица) на базе Apache Hudi + Flink

1. Предыстория

классическая сцена

Реализация на стороне Flink

Бизнес-сторона обычно использует вычислительную систему реального времени для ОБЪЕДИНЕНИЯ нескольких источников данных в потоке для создания этой широкой таблицы. Однако на практике это решение сталкивается со многими проблемами, которые в основном можно разделить на следующие две ситуации:

  1. 1. Таблица размеров JOIN
    • • Проблема сценария: связь между данными индикаторов и данными измерений. Объем данных измерений относительно велик, а данные индикаторов относительно велики. QPS Относительно высокий, что приводит к возможным задержкам вывода данных.
    • • Доступ к механизму хранения данных измерений в рамках QPS создает проблему обратного давления задачи.
    • • житьсуществоватьвопрос:Поскольку разница во времени между бизнес-данными, данными измерений и данными индикаторов относительно велика,,Поэтому для метода потока данных индикатора установлено разумное значение TTL, а данные измерений в кэше не обновляются вовремя;,Это приводит к проблемам с неточными данными в дальнейшем.
  2. 2. Несколько потоков JOIN
    • • Задача сценария: корреляция данных по нескольким показателям,Данные разных индикаторов могут иметь аномально большие различия во времени.
    • • Текущее решение: использование оконного ПРИСОЕДИНЯЙТЕСЬ, сохраняйте относительно большое состояние.
    • • житьсуществоватьвопрос:Поддержание большого состояния не только окажет определенное давление на память.,При этом время Checkpoint и Restore станет дольше.,Может вызвать противодавление задачи

Мы основаны наHudi Payloadиз Механизм слияния,разработал новыйиз Несколько Решение threadjoin:

  • • Несколько Потоковые данные полностью объединяются в уровень хранения, а вычислительный механизм отключается, поэтому нет необходимости сохранять состояние и его TTL из настроек.
  • • Данные измерений и данные индикаторов обновляются независимо в виде разных потоков, и в процессе обновления ничего делать не нужно. объединение данных потоков, нисходящий При чтении Снова Merge Несколько потоковые данные, поэтому нет необходимости кэшировать многомерные данные и в то же время можно выполнять существующие Compact проведено, когда Объединение, ускоряющее последующие запросы.

Это решение предоставляет возможность коррелировать многопотоковые данные на уровне хранения, стремясь решить ряд проблем, возникающих при многопотоковых соединениях в сценариях реального времени.

2. Основные компетенции

2.1 Хронология

Временная шкала поддерживается во всех таблицах, которые содержат операции с набором данных (например, добавление, изменение или удаление) в разные моменты времени. Это будет выполняться каждый раз, когда набор данных выполняется в таблице Hudi. Временная шкала таблицы, так что можно запрашивать только данные, успешно отправленные после определенного момента времени, или можно запрашивать только данные до определенного момента времени, что позволяет эффективно избежать необходимости сканирования данных в более широком диапазоне времени. В то же время вы можете эффективно запрашивать только файлы до внесения изменений (например, после того, как Instant отправляет операцию изменения, вы запрашиваете данные только до определенного момента времени, и вы все равно можете запрашивать данные до изменения).

Действие:

  • • COMMITS: представление данных.
  • • ОЧИСТКА: удаление данных.
  • • DELTA_COMMIT:
  • • СЖАТИЕ: объединение небольших файлов.
  • • ROLLBACK:откат
  • • SAVEPOINT: точка сохранения.

Временная шкала — это абстракция, используемая Hudi для управления коммитами. Каждый коммит привязан к фиксированной метке времени и распространяется на временную шкалу. На временной шкале каждый коммит абстрагируется в HoodieInstant. Instant записывает поведение, метку времени и статус коммита.

В приведенном выше примере показана операция обновления таблицы Hudi каждые 5 минут с 10:00 до 10:20. Временная шкала включает в себя фиксацию, очистку и сжатие. В то же время вы также можете наблюдать, что время фиксации записывает время поступления данных (например, 10:20), но на самом деле данные организованы по времени события (время события) с 7:00 до одного раздела каждый час. . Время прибытия и события событий — две основные концепции балансировки задержки и целостности данных.

Позднее поступление данных (нравиться,Время мероприятия 9:00.,существовать>1часы спустяиз10:20приезжать),Данные о событии будут перемещены в раздел Переписка в зависимости от события. график существования существует с помощью,Инкрементальный запрос требует только считывания всей информации в определенный момент (мгновенный запрос). время) commit Новые данные можно получить путем успешного изменения файлов без сканирования всех файлов.

2.2. Параллельный контроль.

2.2.1. Обзор

Транзакции в озерах данных теперь считаются ключевой особенностью Lakehouse. Но что на самом деле удалось сделать на данный момент? Какие методы доступны на данный момент? Как они себя ведут в реальном мире? Этим вопросам посвящена данная статья.

Мне посчастливилось работать над различными проектами баз данных — РСУБД (Oracle[1]), хранилищами значений ключей NoSQL (Voldemort[2]), потоковыми базами данных (ksqlDB[3]), хранилищами данных реального времени с закрытым исходным кодом и конечно, Apache Hudi. Можно с уверенностью сказать, что различия в рабочих нагрузках глубоко влияют на механизмы управления параллелизмом, принятые в разных базах данных. В этой статье также будет описано, как мы переосмысливаем механизмы управления параллелизмом для озера данных Apache Hudi.

Во-первых, давайте перейдем непосредственно к делу: базы данных РСУБД предоставляют богатейший набор транзакционных функций и самый широкий спектр механизмов управления параллелизмом [4], различные уровни изоляции, детальные блокировки, обнаружение/предотвращение тупиковых ситуаций и многое другое, как и должно быть. Поддерживает изменения на уровне строк и чтение нескольких таблиц, обеспечивая при этом ключевые ограничения [5] и поддерживая индексы [6].

Хранилища NoSQL предлагают очень слабые гарантии, такие как только конечная согласованность и простая атомарность на уровне строк, в обмен на лучшую масштабируемость для более простых рабочих нагрузок. Традиционные хранилища данных основаны на столбцах и предоставляют более или менее полный набор функций, которые можно найти в СУБД, обеспечивая [7] ограничения на блокировку и ключи, в то время как облачные хранилища данных, похоже, больше ориентированы на архитектуру разделения хранения и вычислений, обеспечивая при этом меньшие возможности. уровень изоляции. Удивительный пример: ключевое ограничение [8] не применяется.

2.2.2. Подводные камни в управлении параллелизмом озера данных.

Исторически озера данных рассматривались как пакетные задания чтения/записи файлов в облачном хранилище, и будет интересно посмотреть, как большинство новых попыток расширят эту точку зрения и используют ту или иную форму «оптимистического управления параллелизмом 9]» (OCC). реализовать контроль версий файлов.

Задания OCC используют блокировки на уровне таблицы, чтобы проверить, влияют ли они на перекрывающиеся файлы, и прерывают операцию в случае конфликта. Иногда блокировки представляют собой даже просто блокировки уровня JVM, удерживаемые на одном узле драйвера Apache Spark. Это может быть хорошо для упрощенной координации. пакетных заданий, но он не широко применим к современным рабочим нагрузкам озера данных. Такие подходы созданы с учетом неизменяемых/только добавляемых моделей данных, которые не подходят для поэтапной обработки данных или обновлений/удалений с использованием ключей.

ОКК очень оптимистично настроена на то, что настоящего конфликта никогда не произойдет. Проповеди разработчиков, сравнивающие OCC с полноценными транзакционными возможностями СУБД или традиционного хранилища данных, просто неверны, если цитировать прямо из Википедии: «Если часто возникает конкуренция за ресурсы данных, стоимость многократного перезапуска транзакций может существенно снизить производительность, в В этом случае другие методы управления параллелизмом [10] могут быть более подходящими». Когда конфликты действительно происходят, они могут привести к потере большого количества ресурсов, потому что у вас есть пакетные задания, которые завершаются сбоем через несколько часов при каждой попытке запуска!

Представьте себе реальный сценарий с двумя процессами записи: заданием на запись, которое генерирует новые данные каждые 30 минут, и заданием удаления, которое обеспечивает соблюдение GDPR и занимает 2 часа для завершения удаления. Они, вероятно, будут перекрываться со случайным удалением файлов, и задание на удаление почти гарантированно будет зависать и каждый раз не будет выполнено. Что касается базы данных, сочетание длительных транзакций с оптимизмом может привести к разочарованию, поскольку чем длиннее транзакции, тем выше вероятность их перекрытия.

Так каковы альтернативы? Замок? В Википедии также говорится: «Однако подходы на основе блокировок («пессимистические») также могут обеспечивать более низкую производительность, поскольку блокировки могут значительно ограничивать эффективный параллелизм, даже если взаимоблокировок избежать. Здесь Hudi использует другой подход, который, по нашему мнению, лучше подходит для современных транзакций озера данных, которые часто выполняются в течение длительного времени или даже непрерывны. Рабочие нагрузки озера данных имеют больше характеристик с заданиями потоковой обработки с высокой пропускной способностью, чем стандартное чтение/запись базы данных, и именно это мы и извлекаем. При потоковой обработке события сериализуются в единый упорядоченный журнал, что позволяет избежать узких мест блокировки/параллелизма, и пользователи могут непрерывно обрабатывать миллионы событий в секунду. Hudi реализует протокол управления параллелизмом на уровне файлов на основе журналов на временной шкале Hudi [11], который, в свою очередь, опирается на минимально атомарные записи в облачное хранилище. Создавая журнал событий как основную часть межпроцессной координации, Hudi может предоставить некоторые гибкие модели развертывания, которые обеспечивают более высокий уровень параллелизма, чем чистые методы OCC, которые отслеживают только снимки таблиц.

2.2.3 Модель 1: однократная запись, обслуживание таблиц в режиме реального времени.

Самая простая форма управления параллелизмом — отсутствие параллелизма вообще. В таблицах озера данных часто выполняются общие службы для обеспечения эффективности, освобождения места хранения от старых версий и журналов, объединения файлов (кластеризация в Hudi), дельта-слияния (сжатие в Hudi) и т. д. Hudi просто устраняет необходимость в управлении параллелизмом и максимизирует пропускную способность, поддерживая эти табличные сервисы «из коробки» и запуская их в режиме реального времени после каждой записи в таблицу. Планы выполнения идемпотентны, сохраняются на временной шкале и автоматически восстанавливаются после сбоев. Для большинства простых случаев использования это означает, что простого написания достаточно, чтобы получить хорошо управляемую таблицу, не требующую управления параллелизмом.

2.2.4. Модель 2: асинхронная табличная служба с одной записью.

Наш пример удаления/захвата выше не так прост. В то время как операции приема/записи могут просто обновить N последних разделов в таблице, удаления могут даже охватывать всю таблицу, а смешивание их в одной рабочей нагрузке может существенно повлиять на задержку приема, поэтому Hudi предоставляет службу таблиц, которая работает асинхронно, где большая часть тяжелая работа (например, фактическая перезапись данных столбца с помощью службы сжатия) выполняется асинхронно, исключая любые повторные ненужные попытки, а также используя технологию кластеризации. Таким образом, одна запись может использовать как обычные обновления, так и удаления GDPR, а также сериализовать их в журнал. Учитывая, что Hudi имеет индексы на уровне записей, а запись в журнал avro обходится намного дешевле (по сравнению с записью в паркет, которая может быть в 10 и более раз дороже), задержку приема можно поддерживать, обеспечивая при этом превосходную отслеживаемость. Фактически, мы смогли масштабировать эту модель до масштаба данных 100 ПБ в Uber [12], упорядочив все удаления и обновления в одной исходной теме Apache Kafka. Управление параллелизмом — это не просто блокировки, Hudi может делать это без каких-либо внешних блокировок. этот.

2.2.5. Модель 3: множественная запись.

Но не всегда возможно сериализовать удаления в один и тот же поток записи, или требуются удаления на основе sql. При наличии нескольких распределенных процессов некоторая форма блокировки неизбежна, но, как и в реальной базе данных, модель параллелизма Hudi достаточно умна, чтобы отделить то, что фактически записывается в таблицу, от службы таблиц, которая управляет или оптимизирует таблицу Come. Hudi обеспечивает аналогичный оптимистичный контроль параллелизма между несколькими модулями записи, но служба таблиц по-прежнему может выполняться полностью без блокировок и асинхронно. Это означает, что задание удаления может кодировать только удаления, задание приема может регистрировать обновления, а служба сжатия снова применяет обновления/удаления к базовым файлам. Хотя задания удаления и загрузки могут конкурировать друг с другом, как мы упоминали выше, время их выполнения намного меньше, а потери намного меньше, поскольку сжатие выполняет тяжелую работу по записи данных в паркет/столбец.

Подводя итог, можно сказать, что есть много способов улучшить ситуацию на этой основе.

  • • Во-первых, Худи внедрил механизм маркировки[13],Может отслеживать файлы, являющиеся частью активной транзакции.,И механизм, который может отслеживать активность часов и сердцебиение. Это может использоваться непосредственно другими активными серверами транзакций/записи для определения того, что делают другие серверы записи.,нравиться Если обнаружен конфликт,затем прекратите как можно скорее[14],Это быстрее возвращает ресурсы кластера для других задач.
  • • Хотя существуют, для изоляции оптимизма требуются сериализуемые снимки параллелизм очень привлекателен, но это не лучший и не единственный способ справиться с параллелизмом между читателями. Мы планируем использовать CRDT Широкое внедрение концепций потоковой обработки посредством объединения журналов API Достичь полной блокировки человека Управление параллелизм, возможность которого была доказана [15] для озера Данные поддерживают огромный объем непрерывной записи.
  • • Говоря о ключевых ограничениях, Худи на сегодняшний день является единственным уровнем транзакций, который обеспечивает ограничения уникальных ключей[16]изlake.,Но только для таблицы из ключей записи. Мы постараемся распространить эту функциональность на поля, не являющиеся первичными ключами, в более общей форме.,И используйте более новую из параллельных моделей, упомянутых выше.

2.3. Маркерный механизм

2.3.1. Обзор

Hudi поддерживает автоматическую очистку неудачно отправленных данных при записи. Apache Hudi представляет механизм маркировки во время записи для эффективного отслеживания файлов данных, записываемых в хранилище. В этой статье мы подробно рассмотрим конструкцию существующих механизмов непосредственной маркировки файлов и объясним их проблемы с производительностью в облачном хранилище при очень больших пакетах операций записи. И демонстрирует, как повысить производительность записи за счет внедрения тегов на основе сервера временной шкалы.

2.3.2. Зачем вводить механизм «Маркеры»?

в Худи marker Это метка, указывающая на наличие соответствующего файла данных в хранилище. Hudi использует ее для автоматической очистки незафиксированных данных в сценариях сбоя и отката. Каждая запись тега состоит из трех частей.

  • • Имя файла данных
  • • Расширение маркера (.marker).
  • • Создать файл из I/O Операция (CREATE - Вставить, ОБЪЕДИНИТЬ - обновить/удалить или APPEND - один из двух).

Например, отметка 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet.marker.CREATE Указывает, что соответствующий файл данных 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet и I/O Тип СОЗДАВАТЬ. Перед записью каждого файла данных Худи Клиент записи сначала создает метку в хранилище, которая будет сохраняться и будет явно удалена клиентом записи после успешной отправки. Маркеры полезны для написания клиентов для эффективного выполнения различных операций. Маркеры выполняют две основные функции:

  • Удаление дубликатов/частичных файлов данных:проходить Spark писать Hudi Иногда их будет больше одного Executor Выполнить параллельную запись. один Executor Может выйти из строя, оставив часть файла данных написанной, в этом случае существовать Spark Попробую еще раз Task , когда включено speculative execution , вы можете иметь несколько раз attempts Успешно перенес одни и те же данные в разные файлы, но получил их только один раз. attempt будет передан Spark Представлен процесс программы драйвера. Маркировка помогает эффективно идентифицировать написанные частичные файлы данных.,Содержит и позже успешно записывает файлы данных по сравнению с дубликатами данных.,И существующие записи удаляют эти дубликаты из файлов данных до завершения отправки.
  • откатне удалосьизотправить:При написаниивозможныйсуществовать Провал в середине,Оставьте часть файла данных записи. существуют В этом случае,Отмеченная запись будет сохранена в случае сбоя отправки.Существование сохраняется в следующей операции записи.,Запись клиента сначала отката не удалась из фиксации,Определите файлы данных рисования в этих материалах, отметив их и удалите. Далее мы углубимся в текущий механизм маркировки хаизов.,Объясните проблемы с производительностью,И продемонстрируем новый механизм меток на основе сервера временной шкалы для решения этой проблемы.
2.3.3. Улучшение производительности записи на основе механизма маркировки сервера Timeline.

Основное описание здесьНа основе сервера временной шкалы из механизма маркировки,Этот механизм оптимизированжитьмагазинотметкаизсоответствующая задержка。Hudi Сервер временной шкалы используется для представления файловой системы и временной шкалы. Как показано на рисунке ниже, новый механизм маркировки на основе сервера временной шкалы делегирует создание меток и другие операции, связанные с отметками, от отдельных исполнителей серверу временной шкалы для централизованной обработки. Сервер временной шкалы сохраняет созданные теги в памяти для соответствующих запросов тегов, а сервер временной шкалы обеспечивает согласованность, периодически сбрасывая теги в памяти в ограниченное количество базовых файлов в хранилище. Таким образом, количество фактических операций с файлами и задержка, связанная с маркировкой, могут быть значительно уменьшены, что повышает производительность записи даже при большом количестве файлов данных.

Чтобы повысить эффективность обработки запросов на создание отметок, мы разработали пакетную обработку запросов на пометку на сервере временной шкалы. Каждый запрос на создание маркера обрабатывается асинхронно на сервере временной шкалы Javalin и ставится в очередь перед обработкой. Для каждого пакетного интервала, скажем, 20 миллисекунд, поток отправки извлекает ожидающие запросы из очереди и отправляет их в рабочий поток для обработки. Каждый рабочий поток обрабатывает запросы на создание тегов и перезаписывает базовый файл, в котором хранятся теги. Одновременно выполняется несколько рабочих потоков, и, учитывая, что перезапись файлов занимает больше времени, чем пакетная обработка, каждый рабочий поток записывает в отдельный файл, который не затрагивается другими потоками, чтобы обеспечить согласованность и корректность. И интервал пакетной обработки, и количество рабочих потоков настраиваются с помощью параметров записи.

Обратите внимание, что рабочий поток всегда проверяет, был ли создан тег, сравнивая имя тега в запросе с копией всех тегов в памяти, хранящейся на сервере временной шкалы. Базовый файл, в котором хранятся теги, читается только при первом запросе тега (отложенная загрузка). Запрошенный ответ возвращается только после того, как новые маркеры будут записаны в файл, чтобы в случае сбоя сервера временной шкалы сервер временной шкалы мог восстановить уже созданные маркеры. Это обеспечивает согласованность между хранилищем и репликами в памяти, а также повышает производительность обработки запросов тегов.

2.4. Раннее обнаружение конфликтов.

2.4.1. Обзор

В настоящее время Hudi реализует OCC (оптимистическое управление параллелизмом) на основе временной шкалы, чтобы обеспечить согласованность, целостность и правильность между несколькими операциями записи данных. Однако соответствующее обнаружение конфликта происходит до фиксации метаданных и после завершения записи данных. Если обнаружены какие-либо конфликты, ресурсы кластера тратятся впустую, поскольку вычисления и записи уже завершены. Чтобы решить эту проблему, в этом RFC [17] предлагается механизм раннего обнаружения конфликтов, основанный на существующем механизме маркировки Hudi. Существуют некоторые тонкие различия в рабочих процессах раннего обнаружения конфликтов между разными типами сопровождающих тегов:

  • • Для прямой маркировки худи напрямую перечисляет необходимые файлы разметки.,И существующие авторы выполняют проверки на конфликты перед созданием тегов и началом записи соответствующих файлов данных.
  • • Для тегов на основе сервера временной шкалы, толстовка с капюшоном Просто существующие авторы получают проверку на конфликт тегов перед созданием тегов и началом записи соответствующих файлов данных. Асинхронная и периодическая проверка конфликтов, чтобы конфликты записи обнаруживались как можно раньше. два writer Еще могу написать то же самое file slice изданных до тех пор, пока в ходе следующего раунда проверки не будет обнаружен конфликт.

Что еще более важно, Hoodie может прекратить запись раньше, а благодаря раннему обнаружению конфликтов ресурсы можно высвободить в кластер и улучшить их использование.

2.4.2. Зачем необходимо раннее обнаружение конфликтов?

Транзакции озера данных и multi-writers строится сегодня Lakehouse изключевые особенности。прямая цитатаЛейкхаус Управление параллелизма: Мы слишком оптимистичны?[18]

«Hudi реализует протокол управления параллелизмом на уровне файлов на основе журналов на временной шкале Hudi, который, в свою очередь, опирается на минимальное атомарное размещение облачного хранилища. Создавая журнал событий как основную часть межпроцессной координации, Hudi может Предоставьте несколько гибких моделей развертывания, обеспечивающих более высокий уровень параллелизма, чем чистые методы OCC, которые отслеживают только снимки таблиц».

В сценарии с несколькими авторами обнаружение существующего конфликта Hudi происходит после записи данных автором и перед отправкой метаданных. Другими словами, хотя все вычисления и запись данных завершены, модуль записи обнаруживает конфликт только тогда, когда он начинает фиксироваться, что приводит к пустой трате ресурсов. Например: есть два задания на запись: задание 1 запишет 10M данных в таблицу Hudi, включая обновление группы файлов 1. Другое задание2 запишет 100G в таблицу Hudi, а также обновит ту же группу файлов 1. Задание 1 успешно завершено и отправлено в Hudi. Через несколько часов job2 завершил запись файла данных (100G) и начал отправлять метаданные. В этот раз обнаружился конфликт с заданием 1. После сбоя задания 2 его пришлось остановить и запустить заново. Очевидно, что на работу2 тратится много вычислительных ресурсов и времени.

В настоящее время у Худи есть два важных механизма: механизм маркировки и механизм сердцебиения:

  1. 1. Механизм тегирования может отслеживать все файлы, входящие в состав активного файла.
  2. 2. Механизм Heartbeat может отслеживать всех активных авторов в таблице Hudi.

На основе механизма маркировки и механизма контрольного сигнала в этом RFC предлагается новый метод обнаружения конфликтов: раннее обнаружение конфликтов. Прежде чем средство записи создаст маркер и начнет записывать файл, Hudi выполнит это новое обнаружение конфликта, пытаясь обнаружить конфликты записи напрямую или получить результаты асинхронной проверки конфликтов (на основе временной шкалы) как можно раньше и прервать запись при возникновении конфликта. , чтобы мы могли как можно скорее освободить ресурсы и улучшить их использование.

2.4.3.Реализация

Это рабочий процесс раннего обнаружения конфликтов, как показано на рисунке. 1 показано. Как мы видели, когда supportsOptimisticConcurrencyControl и isEarlyConflictDetectionEnable Когда оба варианта верны, мы можем использовать функцию раннего обнаружения конфликтов. В противном случае мы пропускаем эту проверку и создаем маркер напрямую.

2.5. Запись транзакций (возможность ACID).

Традиционное озеро данныхсуществоватьданные При написаниииз не очень хорошо справляется с транзакционной стороной, но по мере того, как все больше и больше критически важных бизнес-процессов переходят на озеро данных,Ситуация также изменилась,Нам нужен механизм для атомарной публикации пакета данных.,То есть сохраняются только действительные данные.,Требуется частичный отказоткатбез поврежденияиметьданныенабор。Запрос одновременноиз Результаты должны быть воспроизводимымииз,Сторона запроса не может видеть какую-либо часть извлеченных данных.,Любая передача данных должна осуществляться достоверно. Hudi предоставляет мощные возможности ACID. Эффективный механизм изотката может обеспечить согласованность данных и избежать создания «бесхозных файлов» или остатков промежуточных файлов данных.

2.6. Гибкий механизм полезной нагрузки.

2.6.1. Резюме

Apache Hudi Полезная нагрузка — это расширяемый механизм обработки данных. С помощью различных полезных нагрузок мы можем реализовать индивидуальные методы записи данных для сложных сценариев, что значительно повышает гибкость обработки данных. Худи Полезная нагрузкасуществоватьписать выполняет дедупликацию, фильтрацию, объединение и другие операции данных при чтении таблицы Hudi из класса инструмента, используя параметры «hoodie.datasource.write.payload.class» указывает полезную нагрузку, которую нам нужно использовать. сорт. В этой статье мы подробно рассмотрим Худи. Механизм полезной нагрузки, а также различия и сценарии использования различных полезных нагрузок и з.

2.6.2. Зачем нужна полезная нагрузка

существоватьданныеписатьизкогда,Теперь методы вставки всей строки и крышки всей строки могут соответствовать требованиям всех сценариев.,запись данных также будет иметь некоторые индивидуальные потребности в обработке,Поэтому необходим более гибкий метод и определенная обработка данных.,Hudi предоставляет метод загрузки, который может очень хорошо решить эту проблему.,примернравитьсяможно решить При написанииданныеУдалить дубликатывопрос,противОбновить некоторые поляи т. д.。

2.6.3 Механизм действия полезной нагрузки.

Вам необходимо указать параметр при записи в таблицу Hudi hoodie.datasource.write.precombine.field , это поле также называется Precombine Key,Hudi Полезная нагрузка обрабатывает данные на основе этого указанного поля. Она объединяет каждую часть данных в полезную нагрузку, поэтому сравнение между данными становится сравнением между полезными нагрузками. Вам нужно всего лишь реализовать метод сравнения полезной нагрузки в соответствии с потребностями бизнеса для обработки данных. Все полезные нагрузки Hudi реализуют интерфейс HoodieRecordPayload. Все предустановленные классы полезных нагрузок, реализующие этот интерфейс, перечислены ниже.

На следующем рисунке перечислены методы, которые должен реализовать интерфейс HoodieRecordPayload.,Здесь есть два важных метода: preCombine иcombineAndGetUpdateValue.,Ниже мы проанализируем эти два метода.

2.6.3.1 анализ перед объединением

Как видно из рисунка ниже,Этот метод сравнивает текущие данные и oldValue,Затем верните запись.

Из описания аннотации метода preCombine мы также можем узнать, что он сначала используется для дедупликации данных, когда в Hudi одновременно записываются несколько фрагментов данных с одним и тем же первичным ключом. место вызова

На самом деле есть еще одно место, где вызывается этот метод, то есть при чтении таблицы MOR будут обрабатываться данные с тем же первичным ключом в Log-файле. Если один и тот же фрагмент данных изменяется несколько раз и записывается в файл журнала таблицы MOR, при чтении также будет выполняться предварительное объединение.

2.6.3.2 Анализ объединенияAndGetUpdateValue

Этот метод сравнивает currentValue (то есть данные в существующем файле паркета) с новыми данными, чтобы определить, нужно ли сохранять новые данные.

Из-за различий в принципах чтения и записи таблицы COW и таблицы MOR вызов joinAndGetUpdateValueиз существующегоCOWиMOR также отличается:

  • • существоватьCOWПри написанииволяНовые данные записи сохраняются в таблице Hudi изcurrentValue.сравнивать,Возвратить данные, которые необходимо сохранить
  • • существоватьMORПри чтенииволяпройтиpreCombineиметь дело сизДанные в журнале и данные в файле Parquet.сравнивать,Возвратить данные, которые необходимо сохранить

2.7. Поддержка одновременного написания перекрестных задач.

Внутренняя версия Hudi поддерживает сценарий одновременной записи несколькими авторами Flink на основе механизма блокировки файлов и OCC.

2.8. Асинхронное уплотнение и очистка.

Hudi поддерживает встроенную работу в работе. Compationиclean может объединять небольшие файлы и вовремя их очищать, избегая тем самым проблем с небольшими файлами. Конечно, вы также можете отключить встроенный режим через параметры. Сжатие и худи доступны в автономном режиме в Spark/Flink. compactionиclean。

3. Процесс многопоточного сращивания

Далее мы представляем сценарий многопотоковой склейки. Snapshot Query Основной процесс LogFile Выполните слияние дедупликации, а затем снова слейте его. BaseFile и После дедупликации LogFile данные в. На рисунке ниже показан весь процесс объединения данных, который можно разбить на следующие этапы. Два процесса:

• Merge LogFile

Текущая логика Hudi заключается в чтении данных в файле журнала и сохранении их в карте. Для каждой записи в файле журнала, если ключ не существует в карте, он напрямую помещается в карту, если ключ уже существует в карте. Карта, требуется операция обновления.

При многопотоковом сращивании, поскольку в LogFile есть данные, записанные разными потоками данных, то есть столбцы каждых данных могут быть разными, поэтому при обновлении необходимо определить, происходят ли две записи с одинаковым ключом из одного и того же ключа. поток. Если да, выполните Обновить, в противном случае выполните сращивание. Как показано на рисунке 3, при чтении записи, первичным ключом которой является ключ1 в файле журнала2, запись, соответствующая ключу1, уже существует в карте, но две записи происходят из разных потоков, поэтому их необходимо соединить, чтобы сформировать новый Запись (key1, b0_new, c0_new, d0_new) на карту.

  • • Merge BaseFile and LogFile

Текущая логика Hudi по умолчанию заключается в проверке наличия записи с тем же ключом на карте для каждой записи, существующей в BaseFile. Если она существует, перезапишите запись в BaseFile записью в Map. При многопотоковой склейке Запись в Карте не будет полностью перекрывать соответствующую Запись в BaseFile, а может только обновлять значения некоторых столбцов, то есть столбцов, соответствующих Записи в Карте.

Как показано на рисунке ниже, на примере простейшей логики покрытия при чтении BaseFile Первичный ключ в key1 из Record когда, обнаружил key1 существовать Map Уже сохранены в существовании, соответствующем из Record иметь BCD Если есть три столбца значений, обновите BaseFile серединаиз BCD Колонка, получи новое из Запись(key1, b0_new, c0_new, d0_new, e0), примечание E Столбец не был обновлен, поэтому сохраняется исходное значение. е0. Для новых дополнений Key нравиться Key3 Переписка Запишите, вам нужно BCE Добавляются три столбца со значением по По умолчанию сформировать полное из Record。

4. Принципиальная схема реализации

Реализация принципа из в основном заключается в настройке Payload class добиться того же key Различные исходные данные из-за логики слияния, конец записи будет объединять несколько источников и записывать в пакете. log, сторона чтения также будет вызывать ту же логику для обработки ситуаций между пакетами при слиянии при чтении.

Здесь следует отметить, что из – данные вне порядка (out-of-order). and late events)извопрос。нравиться Если ты этого не сделаешьиметь дело с,существование ниже по течению часто приводит к появлению старых данных. Крышка новых данных.,Или обновление столбца неполное.

В случае неупорядоченных и запоздалых данных мы Hudi сделанный Multiple ordering value Улучшение, гарантирующее, что каждый источник может обновлять только свою часть данных столбца, может быть установлено в соответствии с event time (ordering value) столбец, гарантируя, что только новые данные перезаписывают старые данные. финальная комбинация lock less multiple writers чтобы достичь большего Job Многоисходный параллелизм писать.

5.Как использовать

5.1.Зависимости Maven pom

Для этой функции была выпущена версия моментального снимка на основе 0.12.0-1-tencentiz.

Язык кода:javascript
копировать
<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.13-bundle</artifactId>
    <version>0.12.0</version>
  </dependency>
</dependencies>

5.2. Несколько заданий Flink записывают в одну и ту же целевую таблицу.

Job1

  • • Исходная таблица A
Язык кода:javascript
копировать
CREATE TABLE sourceA (\n" +
  uuid STRING,\n" +
  name STRING,\n" +
  _ts1 timestamp(3)\n" +
) WITH (\n" +
.....
)
  • • целевая таблица
Язык кода:javascript
копировать
    public static String sinkTableDDL1() {
        return String.format("create table %s(\n"
            + "  uuid STRING,\n"
            + "  name STRING,\n"
            + "  age int,\n"
            + "  _ts1 bigint,\n"
            + "  _ts2 bigint,\n"
            + "  PRIMARY KEY(uuid) NOT ENFORCED"
            + ")\n"
            + " PARTITIONED BY (_ts1)\n"
            + " with (\n"
            + "  'connector' = 'hudi',\n"
            + "  'path' = '%s', -- Заменить на абсолютный путь\n"
            + "  'table.type' = 'MERGE_ON_READ',\n"
            + "  'write.bucket_assign.tasks' = '5',\n"
            + "  'write.tasks' = '5',\n"
            + "  'write.partition.format' = 'yyyyMMdd',\n"
            + "  'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"
            + "  'hoodie.bucket.index.num.buckets' = '5',\n"
            + "  'changelog.enabled' = 'true',\n"
            + "  'index.type' = 'BUCKET',\n"
            + "  'hoodie.bucket.index.num.buckets' = '5',\n"
            + String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age")
            + "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"
            + "  'hoodie.write.log.suffix' = 'job1',\n"
            + "  'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"
            + "  'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"
            + "  'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"
            + "  'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"
            + "  'hoodie.consistency.check.enabled' = 'false',\n"
            + "  'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"
            + "  'hoodie.write.lock.early.conflict.detection.strategy' = '"
            + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"
            + "  'hoodie.keep.min.commits' = '1440',\n"
            + "  'hoodie.keep.max.commits' = '2880',\n"
            + "  'compaction.schedule.enabled'='false',\n"
            + "  'compaction.async.enabled'='false',\n"
            + "  'compaction.trigger.strategy'='num_or_time',\n"
            + "  'compaction.delta_commits' ='5',\n"
            + "  'compaction.delta_seconds' ='180',\n"
            + "  'compaction.max_memory' = '3096',\n"
            + "  'clean.async.enabled' ='false',\n"
            + "  'hive_sync.enable' = 'true',\n"
            + "  'hive_sync.mode' = 'hms',\n"
            + "  'hive_sync.db' = '%s',\n"
            + "  'hive_sync.table' = '%s',\n"
            + "  'hive_sync.metastore.uris' = '%s'\n"
            + ")", sinkAliasTable1, basePath, dbName, targetTable, metastoreUrl);
    }

Потоковая запись данных

Язык кода:javascript
копировать
insert into %s(uuid, name, _ts1) select uuid, name, ts as _ts1 from sourceA

Job2

  • • Исходная таблица Б.
Язык кода:javascript
копировать
CREATE TABLE sourceB (\n" +
  uuid varchar(20),\n" +
  age int,\n" +
  _ts2 timestamp(3)\n" +
) WITH (\n" +
.....
)
  • • целевая таблица
Язык кода:javascript
копировать
   public static String sinkTableDDL2() {
        return String.format("create table %s(\n"
            + "  uuid STRING,\n"
            + "  name STRING,\n"
            + "  age int,\n"
            + "  _ts1 bigint,\n"
            + "  _ts2 bigint,\n"
            + "  PRIMARY KEY(uuid) NOT ENFORCED"
            + ")\n"
            + " PARTITIONED BY (_ts2)\n"
            + " with (\n"
            + "  'connector' = 'hudi',\n"
            + "  'path' = '%s', -- Заменить на абсолютный путь\n"
            + "  'table.type' = 'MERGE_ON_READ',\n"
            + "  'write.bucket_assign.tasks' = '5',\n"
            + "  'write.tasks' = '5',\n"
            + "  'write.partition.format' = 'yyyyMMdd',\n"
            + "  'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"
            + "  'changelog.enabled' = 'true',\n"
            + "  'index.type' = 'BUCKET',\n"
            + "  'hoodie.bucket.index.num.buckets' = '5',\n"
            + String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age")
            + "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"
            + "  'hoodie.write.log.suffix' = 'job2',\n"
            + "  'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"
            + "  'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"
            + "  'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"
            + "  'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"
            + "  'hoodie.consistency.check.enabled' = 'false',\n"
            + "  'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"
            + "  'hoodie.write.lock.early.conflict.detection.strategy' = '"
            + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"
            + "  'hoodie.keep.min.commits' = '1440',\n"
            + "  'hoodie.keep.max.commits' = '2880',\n"
            + "  'compaction.schedule.enabled'='true',\n"
            + "  'compaction.async.enabled'='true',\n"
            + "  'compaction.trigger.strategy'='num_or_time',\n"
            + "  'compaction.delta_commits' ='5',\n"
            + "  'compaction.delta_seconds' ='180',\n"
            + "  'compaction.max_memory' = '3096',\n"
            + "  'clean.async.enabled' ='false',\n"
            + "  'hive_sync.enable' = 'true',\n"
            + "  'hive_sync.mode' = 'hms',\n"
            + "  'hive_sync.db' = '%s',\n"
            + "  'hive_sync.table' = '%s',\n"
            + "  'hive_sync.metastore.uris' = '%s'\n"
            + ")", sinkAliasTable2, basePath, dbName, targetTable, metastoreUrl);
    }

Запись данных в поток B

Язык кода:javascript
копировать
insert into %s(uuid, age, _ts2) select uuid, age, ts as _ts2 from sourceB

5.3. Одно задание флинка и несколько конвейеров записывают в одну и ту же таблицу.

  • • Создайте исходную таблицу A и исходную таблицу B. То же, что и создание таблицы ddl в 5.2.
  • • Aпоток、Запись данных в поток B То же, что и вставка в 5.2. писать

5.4. Описание параметров настройки.

Имя параметра

Required

значение по умолчанию

Примечание

path

Required

N/A

целевая путь к таблице

table.type

Optional

MERGE_ON_READ

Тип таблицы: COPY_ON_WRITE или MERGE_ON_READ.

write.operation

Optional

upsert

Тип записи: UPSERT или INSERT.

write.payload.class

Required

PartialUpdateAvroPayload

Укажите данные обработки изPayload, PartialUpdateAvroPayload.class.getName()

write.partition.format

Optional

N/A

Формат раздела: гггг ММдд разбит по дням, гггг ММдд ЧЧ — по часам.

write.partition.timestamp.type

Optional

N/A

Тип раздела, используется, когда поле раздела имеет значение bigint (длинное), значение: EPOCHMILLISCONDS.

write.precombine

Required

ts

_ts1:имя;_ts2:возраст указывает, что _ts1 и _ts2 являются полями сортировки.,После двоеточия необходимо обновить поле из.,Точки с запятой используются для обозначения разных потоков.

hoodie.write.log.suffix

Required

никто

Суффикс файла журнала используется для различения разных заданий.

index.type

Required

FLINK_STATE

Установите здесь BUCKET

hoodie.bucket.index.num.buckets

Required

256

Необходимо оценить на основе объема данных

hoodie.write.concurrency.mode

Required

SINGLE_WRITER

Установите optimistic_concurrency_control.

hoodie.cleaner.policy.failed.writes

Required

LAZY

Установите режим ЛЕНИВЫЙ

hoodie.write.lock.early.conflict.detection.enable

Required

true

hoodie.write.lock.early.conflict.detection.strategy

Required

SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName()

проиллюстрировать: 1. По данным прекомбината Ключевое сравнение, следует ли обновлять данные, подходящие для входа в озеро в реальном времени, и порядок входа в озеро не в порядке. 2.нравиться Если исходная таблица пользователясередина Если значения поля времени одинаковы и не могут сравниваться,FIFOиз Последовательное сращивание и слияние。

5.5. Запрос данных

5.5.1. Использование искрового запроса.
Язык кода:javascript
копировать
select * from hudi_tauth_test.hudi_partial_01_rt limit 10;
5.5.2. Запрос с использованием Presto.

Примечание: версия presto будет обновлена.

6. Эффект дохода

В конечном итоге, на основе Hudi Несколько Решение склейки потоков, помимо решения задачи в фоновом режиме, существует хранилище данных реального времени. DWS Реализация уровня, поддерживается одна таблица 3+ Поток данных при одновременном импорте, крышка сотен TB изданные. Кроме того, существование использует Spark При запросе данных широкой таблицы, поскольку данные были дедуплицированы, сжаты и объединены в большую широкую таблицу, объем одного сканирования составляет десятки TB В запросе производительность улучшена по сравнению с прямым использованием многотабличных корреляцийсуществовать 200% Выше существуют некоторые более сложные запросы, которые также имеют 40-140% из Улучшения производительности.

7. Планирование следующего шага

  • • Дальнейшее улучшение Худи Несколько Решение для объединения потоков просто в использовании и упрощает настройку параметров. Некоторые вставки и обновления столбцов будут выполнены позже. SQL из поддержки синтаксиса и параметра из конвергенции.
  • • Используйте механизм полезной нагрузки для реализации левого соединения Flink, правого соединения, TopN и других функций.
  • • Верните сообществу функцию мультиписателя.

ссылка

  • • Гибкий механизм полезной нагрузки Apache Hudi[19]
  • • Snapshot Isolation using Optimistic Concurrency Control for multi-writers[20]
  • https://mp.weixin.qq.com/s/3nsYTVu9nZCIFaaXP09hiQ
  • • https://segmentfault.com/a/1190000041630798
Справочная ссылка

[1] Oracle: https://www.oracle.com/database/ [2] Voldemort: https://www.slideshare.net/vinothchandar/voldemort-prototype-to-production-nectar-edits [3] ksqlDB: https://www.confluent.io/blog/ksqldb-pull-queries-high-availability/ [4] механизм: https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-transaction-model.html [5] Ключевые ограничения: https://dev.mysql.com/doc/refman/8.0/en/create-table-foreign-keys.html [6] индекс: https://dev.mysql.com/doc/refman/8.0/en/create-table-secondary-indexes.html [7] Обязательный: https://docs.teradata.com/r/a8IdS6iVHR77Z9RrIkmMGg/wFPZS4jwZgSG21GnOIpEsw [8] Правоприменение: https://docs.snowflake.com/en/sql-reference/constraints-overview.html#supported-constraint-types [9] Оптимистическое управление параллелизмом: https://en.wikipedia.org/wiki/Optimistic_concurrency_control [10] Методы управления параллелизмом: https://en.wikipedia.org/wiki/Concurrency_control [11] Хронология: https://hudi.apache.org/docs/timeline [12] Uber: https://eng.uber.com/uber-big-data-platform/ [13] Тег механизма: https://hudi.apache.org/blog/2021/08/18/improving-marker-mechanism/ [14] Прервать досрочно: https://issues.apache.org/jira/browse/HUDI-1575 [15] Доказано: https://hudi.apache.org/blog/2021/09/01/building-eb-level-data-lake-using-hudi-at-bytedance/#functionality-support [16] только Ключевые ограничения: https://hudi.apache.org/docs/key_generation [17] этот RFC: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers [18] Лейкхаус Управление параллелизма: Мы слишком оптимистичны?: https://hudi.apache.org/blog/2021/12/16/lakehouse-concurrency-control-are-we-too-optimistic/ [19] Apache Hudi гибкий из механизма Payload: https://developer.aliyun.com/article/909719 [20] Snapshot Isolation using Optimistic Concurrency Control for multi-writers: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose