классическая сцена
Реализация на стороне Flink
Бизнес-сторона обычно использует вычислительную систему реального времени для ОБЪЕДИНЕНИЯ нескольких источников данных в потоке для создания этой широкой таблицы. Однако на практике это решение сталкивается со многими проблемами, которые в основном можно разделить на следующие две ситуации:
Поскольку разница во времени между бизнес-данными, данными измерений и данными индикаторов относительно велика,,Поэтому для метода потока данных индикатора установлено разумное значение TTL, а данные измерений в кэше не обновляются вовремя;,Это приводит к проблемам с неточными данными в дальнейшем.
。Поддержание большого состояния не только окажет определенное давление на память.,При этом время Checkpoint и Restore станет дольше.,Может вызвать противодавление задачи
。Мы основаны наHudi Payloadиз Механизм слияния,разработал новыйиз Несколько Решение threadjoin:
Это решение предоставляет возможность коррелировать многопотоковые данные на уровне хранения, стремясь решить ряд проблем, возникающих при многопотоковых соединениях в сценариях реального времени.
Временная шкала поддерживается во всех таблицах, которые содержат операции с набором данных (например, добавление, изменение или удаление) в разные моменты времени. Это будет выполняться каждый раз, когда набор данных выполняется в таблице Hudi. Временная шкала таблицы, так что можно запрашивать только данные, успешно отправленные после определенного момента времени, или можно запрашивать только данные до определенного момента времени, что позволяет эффективно избежать необходимости сканирования данных в более широком диапазоне времени. В то же время вы можете эффективно запрашивать только файлы до внесения изменений (например, после того, как Instant отправляет операцию изменения, вы запрашиваете данные только до определенного момента времени, и вы все равно можете запрашивать данные до изменения).
Действие:
Временная шкала — это абстракция, используемая Hudi для управления коммитами. Каждый коммит привязан к фиксированной метке времени и распространяется на временную шкалу. На временной шкале каждый коммит абстрагируется в HoodieInstant. Instant записывает поведение, метку времени и статус коммита.
В приведенном выше примере показана операция обновления таблицы Hudi каждые 5 минут с 10:00 до 10:20. Временная шкала включает в себя фиксацию, очистку и сжатие. В то же время вы также можете наблюдать, что время фиксации записывает время поступления данных (например, 10:20), но на самом деле данные организованы по времени события (время события) с 7:00 до одного раздела каждый час. . Время прибытия и события событий — две основные концепции балансировки задержки и целостности данных.
Позднее поступление данных (нравиться,Время мероприятия 9:00.,существовать>1часы спустяиз10:20приезжать),Данные о событии будут перемещены в раздел Переписка в зависимости от события. график существования существует с помощью,Инкрементальный запрос требует только считывания всей информации в определенный момент (мгновенный запрос). время) commit Новые данные можно получить путем успешного изменения файлов без сканирования всех файлов.
Транзакции в озерах данных теперь считаются ключевой особенностью Lakehouse. Но что на самом деле удалось сделать на данный момент? Какие методы доступны на данный момент? Как они себя ведут в реальном мире? Этим вопросам посвящена данная статья.
Мне посчастливилось работать над различными проектами баз данных — РСУБД (Oracle[1]), хранилищами значений ключей NoSQL (Voldemort[2]), потоковыми базами данных (ksqlDB[3]), хранилищами данных реального времени с закрытым исходным кодом и конечно, Apache Hudi. Можно с уверенностью сказать, что различия в рабочих нагрузках глубоко влияют на механизмы управления параллелизмом, принятые в разных базах данных. В этой статье также будет описано, как мы переосмысливаем механизмы управления параллелизмом для озера данных Apache Hudi.
Во-первых, давайте перейдем непосредственно к делу: базы данных РСУБД предоставляют богатейший набор транзакционных функций и самый широкий спектр механизмов управления параллелизмом [4], различные уровни изоляции, детальные блокировки, обнаружение/предотвращение тупиковых ситуаций и многое другое, как и должно быть. Поддерживает изменения на уровне строк и чтение нескольких таблиц, обеспечивая при этом ключевые ограничения [5] и поддерживая индексы [6].
Хранилища NoSQL предлагают очень слабые гарантии, такие как только конечная согласованность и простая атомарность на уровне строк, в обмен на лучшую масштабируемость для более простых рабочих нагрузок. Традиционные хранилища данных основаны на столбцах и предоставляют более или менее полный набор функций, которые можно найти в СУБД, обеспечивая [7] ограничения на блокировку и ключи, в то время как облачные хранилища данных, похоже, больше ориентированы на архитектуру разделения хранения и вычислений, обеспечивая при этом меньшие возможности. уровень изоляции. Удивительный пример: ключевое ограничение [8] не применяется.
Исторически озера данных рассматривались как пакетные задания чтения/записи файлов в облачном хранилище, и будет интересно посмотреть, как большинство новых попыток расширят эту точку зрения и используют ту или иную форму «оптимистического управления параллелизмом 9]» (OCC). реализовать контроль версий файлов.
Задания OCC используют блокировки на уровне таблицы, чтобы проверить, влияют ли они на перекрывающиеся файлы, и прерывают операцию в случае конфликта. Иногда блокировки представляют собой даже просто блокировки уровня JVM, удерживаемые на одном узле драйвера Apache Spark. Это может быть хорошо для упрощенной координации. пакетных заданий, но он не широко применим к современным рабочим нагрузкам озера данных. Такие подходы созданы с учетом неизменяемых/только добавляемых моделей данных, которые не подходят для поэтапной обработки данных или обновлений/удалений с использованием ключей.
ОКК очень оптимистично настроена на то, что настоящего конфликта никогда не произойдет. Проповеди разработчиков, сравнивающие OCC с полноценными транзакционными возможностями СУБД или традиционного хранилища данных, просто неверны, если цитировать прямо из Википедии: «Если часто возникает конкуренция за ресурсы данных, стоимость многократного перезапуска транзакций может существенно снизить производительность, в В этом случае другие методы управления параллелизмом [10] могут быть более подходящими». Когда конфликты действительно происходят, они могут привести к потере большого количества ресурсов, потому что у вас есть пакетные задания, которые завершаются сбоем через несколько часов при каждой попытке запуска!
Представьте себе реальный сценарий с двумя процессами записи: заданием на запись, которое генерирует новые данные каждые 30 минут, и заданием удаления, которое обеспечивает соблюдение GDPR и занимает 2 часа для завершения удаления. Они, вероятно, будут перекрываться со случайным удалением файлов, и задание на удаление почти гарантированно будет зависать и каждый раз не будет выполнено. Что касается базы данных, сочетание длительных транзакций с оптимизмом может привести к разочарованию, поскольку чем длиннее транзакции, тем выше вероятность их перекрытия.
Так каковы альтернативы? Замок? В Википедии также говорится: «Однако подходы на основе блокировок («пессимистические») также могут обеспечивать более низкую производительность, поскольку блокировки могут значительно ограничивать эффективный параллелизм, даже если взаимоблокировок избежать. Здесь Hudi использует другой подход, который, по нашему мнению, лучше подходит для современных транзакций озера данных, которые часто выполняются в течение длительного времени или даже непрерывны. Рабочие нагрузки озера данных имеют больше характеристик с заданиями потоковой обработки с высокой пропускной способностью, чем стандартное чтение/запись базы данных, и именно это мы и извлекаем. При потоковой обработке события сериализуются в единый упорядоченный журнал, что позволяет избежать узких мест блокировки/параллелизма, и пользователи могут непрерывно обрабатывать миллионы событий в секунду. Hudi реализует протокол управления параллелизмом на уровне файлов на основе журналов на временной шкале Hudi [11], который, в свою очередь, опирается на минимально атомарные записи в облачное хранилище. Создавая журнал событий как основную часть межпроцессной координации, Hudi может предоставить некоторые гибкие модели развертывания, которые обеспечивают более высокий уровень параллелизма, чем чистые методы OCC, которые отслеживают только снимки таблиц.
Самая простая форма управления параллелизмом — отсутствие параллелизма вообще. В таблицах озера данных часто выполняются общие службы для обеспечения эффективности, освобождения места хранения от старых версий и журналов, объединения файлов (кластеризация в Hudi), дельта-слияния (сжатие в Hudi) и т. д. Hudi просто устраняет необходимость в управлении параллелизмом и максимизирует пропускную способность, поддерживая эти табличные сервисы «из коробки» и запуская их в режиме реального времени после каждой записи в таблицу. Планы выполнения идемпотентны, сохраняются на временной шкале и автоматически восстанавливаются после сбоев. Для большинства простых случаев использования это означает, что простого написания достаточно, чтобы получить хорошо управляемую таблицу, не требующую управления параллелизмом.
Наш пример удаления/захвата выше не так прост. В то время как операции приема/записи могут просто обновить N последних разделов в таблице, удаления могут даже охватывать всю таблицу, а смешивание их в одной рабочей нагрузке может существенно повлиять на задержку приема, поэтому Hudi предоставляет службу таблиц, которая работает асинхронно, где большая часть тяжелая работа (например, фактическая перезапись данных столбца с помощью службы сжатия) выполняется асинхронно, исключая любые повторные ненужные попытки, а также используя технологию кластеризации. Таким образом, одна запись может использовать как обычные обновления, так и удаления GDPR, а также сериализовать их в журнал. Учитывая, что Hudi имеет индексы на уровне записей, а запись в журнал avro обходится намного дешевле (по сравнению с записью в паркет, которая может быть в 10 и более раз дороже), задержку приема можно поддерживать, обеспечивая при этом превосходную отслеживаемость. Фактически, мы смогли масштабировать эту модель до масштаба данных 100 ПБ в Uber [12], упорядочив все удаления и обновления в одной исходной теме Apache Kafka. Управление параллелизмом — это не просто блокировки, Hudi может делать это без каких-либо внешних блокировок. этот.
Но не всегда возможно сериализовать удаления в один и тот же поток записи, или требуются удаления на основе sql. При наличии нескольких распределенных процессов некоторая форма блокировки неизбежна, но, как и в реальной базе данных, модель параллелизма Hudi достаточно умна, чтобы отделить то, что фактически записывается в таблицу, от службы таблиц, которая управляет или оптимизирует таблицу Come. Hudi обеспечивает аналогичный оптимистичный контроль параллелизма между несколькими модулями записи, но служба таблиц по-прежнему может выполняться полностью без блокировок и асинхронно. Это означает, что задание удаления может кодировать только удаления, задание приема может регистрировать обновления, а служба сжатия снова применяет обновления/удаления к базовым файлам. Хотя задания удаления и загрузки могут конкурировать друг с другом, как мы упоминали выше, время их выполнения намного меньше, а потери намного меньше, поскольку сжатие выполняет тяжелую работу по записи данных в паркет/столбец.
Подводя итог, можно сказать, что есть много способов улучшить ситуацию на этой основе.
Hudi поддерживает автоматическую очистку неудачно отправленных данных при записи. Apache Hudi представляет механизм маркировки во время записи для эффективного отслеживания файлов данных, записываемых в хранилище. В этой статье мы подробно рассмотрим конструкцию существующих механизмов непосредственной маркировки файлов и объясним их проблемы с производительностью в облачном хранилище при очень больших пакетах операций записи. И демонстрирует, как повысить производительность записи за счет внедрения тегов на основе сервера временной шкалы.
в Худи marker
Это метка, указывающая на наличие соответствующего файла данных в хранилище. Hudi использует ее для автоматической очистки незафиксированных данных в сценариях сбоя и отката. Каждая запись тега состоит из трех частей.
Например, отметка 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet.marker.CREATE
Указывает, что соответствующий файл данных 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet
и I/O Тип СОЗДАВАТЬ. Перед записью каждого файла данных Худи Клиент записи сначала создает метку в хранилище, которая будет сохраняться и будет явно удалена клиентом записи после успешной отправки. Маркеры полезны для написания клиентов для эффективного выполнения различных операций. Маркеры выполняют две основные функции:
speculative execution
, вы можете иметь несколько раз attempts
Успешно перенес одни и те же данные в разные файлы, но получил их только один раз. attempt
будет передан Spark Представлен процесс программы драйвера. Маркировка помогает эффективно идентифицировать написанные частичные файлы данных.,Содержит и позже успешно записывает файлы данных по сравнению с дубликатами данных.,И существующие записи удаляют эти дубликаты из файлов данных до завершения отправки.Основное описание здесьНа основе сервера временной шкалы из механизма маркировки,Этот механизм оптимизированжитьмагазинотметкаизсоответствующая задержка。Hudi Сервер временной шкалы используется для представления файловой системы и временной шкалы. Как показано на рисунке ниже, новый механизм маркировки на основе сервера временной шкалы делегирует создание меток и другие операции, связанные с отметками, от отдельных исполнителей серверу временной шкалы для централизованной обработки. Сервер временной шкалы сохраняет созданные теги в памяти для соответствующих запросов тегов, а сервер временной шкалы обеспечивает согласованность, периодически сбрасывая теги в памяти в ограниченное количество базовых файлов в хранилище. Таким образом, количество фактических операций с файлами и задержка, связанная с маркировкой, могут быть значительно уменьшены, что повышает производительность записи даже при большом количестве файлов данных.
Чтобы повысить эффективность обработки запросов на создание отметок, мы разработали пакетную обработку запросов на пометку на сервере временной шкалы. Каждый запрос на создание маркера обрабатывается асинхронно на сервере временной шкалы Javalin и ставится в очередь перед обработкой. Для каждого пакетного интервала, скажем, 20 миллисекунд, поток отправки извлекает ожидающие запросы из очереди и отправляет их в рабочий поток для обработки. Каждый рабочий поток обрабатывает запросы на создание тегов и перезаписывает базовый файл, в котором хранятся теги. Одновременно выполняется несколько рабочих потоков, и, учитывая, что перезапись файлов занимает больше времени, чем пакетная обработка, каждый рабочий поток записывает в отдельный файл, который не затрагивается другими потоками, чтобы обеспечить согласованность и корректность. И интервал пакетной обработки, и количество рабочих потоков настраиваются с помощью параметров записи.
Обратите внимание, что рабочий поток всегда проверяет, был ли создан тег, сравнивая имя тега в запросе с копией всех тегов в памяти, хранящейся на сервере временной шкалы. Базовый файл, в котором хранятся теги, читается только при первом запросе тега (отложенная загрузка). Запрошенный ответ возвращается только после того, как новые маркеры будут записаны в файл, чтобы в случае сбоя сервера временной шкалы сервер временной шкалы мог восстановить уже созданные маркеры. Это обеспечивает согласованность между хранилищем и репликами в памяти, а также повышает производительность обработки запросов тегов.
В настоящее время Hudi реализует OCC (оптимистическое управление параллелизмом) на основе временной шкалы, чтобы обеспечить согласованность, целостность и правильность между несколькими операциями записи данных. Однако соответствующее обнаружение конфликта происходит до фиксации метаданных и после завершения записи данных. Если обнаружены какие-либо конфликты, ресурсы кластера тратятся впустую, поскольку вычисления и записи уже завершены. Чтобы решить эту проблему, в этом RFC [17] предлагается механизм раннего обнаружения конфликтов, основанный на существующем механизме маркировки Hudi. Существуют некоторые тонкие различия в рабочих процессах раннего обнаружения конфликтов между разными типами сопровождающих тегов:
Что еще более важно, Hoodie может прекратить запись раньше, а благодаря раннему обнаружению конфликтов ресурсы можно высвободить в кластер и улучшить их использование.
Транзакции озера данных и multi-writers строится сегодня Lakehouse изключевые особенности。прямая цитатаЛейкхаус Управление параллелизма: Мы слишком оптимистичны?[18]
«Hudi реализует протокол управления параллелизмом на уровне файлов на основе журналов на временной шкале Hudi, который, в свою очередь, опирается на минимальное атомарное размещение облачного хранилища. Создавая журнал событий как основную часть межпроцессной координации, Hudi может Предоставьте несколько гибких моделей развертывания, обеспечивающих более высокий уровень параллелизма, чем чистые методы OCC, которые отслеживают только снимки таблиц».
В сценарии с несколькими авторами обнаружение существующего конфликта Hudi происходит после записи данных автором и перед отправкой метаданных. Другими словами, хотя все вычисления и запись данных завершены, модуль записи обнаруживает конфликт только тогда, когда он начинает фиксироваться, что приводит к пустой трате ресурсов. Например: есть два задания на запись: задание 1 запишет 10M данных в таблицу Hudi, включая обновление группы файлов 1. Другое задание2 запишет 100G в таблицу Hudi, а также обновит ту же группу файлов 1. Задание 1 успешно завершено и отправлено в Hudi. Через несколько часов job2 завершил запись файла данных (100G) и начал отправлять метаданные. В этот раз обнаружился конфликт с заданием 1. После сбоя задания 2 его пришлось остановить и запустить заново. Очевидно, что на работу2 тратится много вычислительных ресурсов и времени.
В настоящее время у Худи есть два важных механизма: механизм маркировки и механизм сердцебиения:
На основе механизма маркировки и механизма контрольного сигнала в этом RFC предлагается новый метод обнаружения конфликтов: раннее обнаружение конфликтов. Прежде чем средство записи создаст маркер и начнет записывать файл, Hudi выполнит это новое обнаружение конфликта, пытаясь обнаружить конфликты записи напрямую или получить результаты асинхронной проверки конфликтов (на основе временной шкалы) как можно раньше и прервать запись при возникновении конфликта. , чтобы мы могли как можно скорее освободить ресурсы и улучшить их использование.
Это рабочий процесс раннего обнаружения конфликтов, как показано на рисунке. 1 показано. Как мы видели, когда supportsOptimisticConcurrencyControl
и isEarlyConflictDetectionEnable
Когда оба варианта верны, мы можем использовать функцию раннего обнаружения конфликтов. В противном случае мы пропускаем эту проверку и создаем маркер напрямую.
Традиционное озеро данныхсуществоватьданные При написаниииз не очень хорошо справляется с транзакционной стороной, но по мере того, как все больше и больше критически важных бизнес-процессов переходят на озеро данных,Ситуация также изменилась,Нам нужен механизм для атомарной публикации пакета данных.,То есть сохраняются только действительные данные.,Требуется частичный отказоткатбез поврежденияиметьданныенабор。Запрос одновременноиз Результаты должны быть воспроизводимымииз,Сторона запроса не может видеть какую-либо часть извлеченных данных.,Любая передача данных должна осуществляться достоверно. Hudi предоставляет мощные возможности ACID. Эффективный механизм изотката может обеспечить согласованность данных и избежать создания «бесхозных файлов» или остатков промежуточных файлов данных.
Apache Hudi Полезная нагрузка — это расширяемый механизм обработки данных. С помощью различных полезных нагрузок мы можем реализовать индивидуальные методы записи данных для сложных сценариев, что значительно повышает гибкость обработки данных. Худи Полезная нагрузкасуществоватьписать выполняет дедупликацию, фильтрацию, объединение и другие операции данных при чтении таблицы Hudi из класса инструмента, используя параметры «hoodie.datasource.write.payload.class» указывает полезную нагрузку, которую нам нужно использовать. сорт. В этой статье мы подробно рассмотрим Худи. Механизм полезной нагрузки, а также различия и сценарии использования различных полезных нагрузок и з.
существоватьданныеписатьизкогда,Теперь методы вставки всей строки и крышки всей строки могут соответствовать требованиям всех сценариев.,запись данных также будет иметь некоторые индивидуальные потребности в обработке,Поэтому необходим более гибкий метод и определенная обработка данных.,Hudi предоставляет метод загрузки, который может очень хорошо решить эту проблему.,примернравитьсяможно решить При написанииданныеУдалить дубликаты
вопрос,противОбновить некоторые поля
и т. д.。
Вам необходимо указать параметр при записи в таблицу Hudi hoodie.datasource.write.precombine.field
, это поле также называется Precombine Key,Hudi Полезная нагрузка обрабатывает данные на основе этого указанного поля. Она объединяет каждую часть данных в полезную нагрузку, поэтому сравнение между данными становится сравнением между полезными нагрузками. Вам нужно всего лишь реализовать метод сравнения полезной нагрузки в соответствии с потребностями бизнеса для обработки данных. Все полезные нагрузки Hudi реализуют интерфейс HoodieRecordPayload. Все предустановленные классы полезных нагрузок, реализующие этот интерфейс, перечислены ниже.
На следующем рисунке перечислены методы, которые должен реализовать интерфейс HoodieRecordPayload.,Здесь есть два важных метода: preCombine иcombineAndGetUpdateValue.,Ниже мы проанализируем эти два метода.
Как видно из рисунка ниже,Этот метод сравнивает текущие данные и oldValue,Затем верните запись.
Из описания аннотации метода preCombine мы также можем узнать, что он сначала используется для дедупликации данных, когда в Hudi одновременно записываются несколько фрагментов данных с одним и тем же первичным ключом. место вызова
На самом деле есть еще одно место, где вызывается этот метод, то есть при чтении таблицы MOR будут обрабатываться данные с тем же первичным ключом в Log-файле. Если один и тот же фрагмент данных изменяется несколько раз и записывается в файл журнала таблицы MOR, при чтении также будет выполняться предварительное объединение.
Этот метод сравнивает currentValue (то есть данные в существующем файле паркета) с новыми данными, чтобы определить, нужно ли сохранять новые данные.
Из-за различий в принципах чтения и записи таблицы COW и таблицы MOR вызов joinAndGetUpdateValueиз существующегоCOWиMOR также отличается:
Внутренняя версия Hudi поддерживает сценарий одновременной записи несколькими авторами Flink на основе механизма блокировки файлов и OCC.
Hudi поддерживает встроенную работу в работе. Compationиclean может объединять небольшие файлы и вовремя их очищать, избегая тем самым проблем с небольшими файлами. Конечно, вы также можете отключить встроенный режим через параметры. Сжатие и худи доступны в автономном режиме в Spark/Flink. compactionиclean。
Далее мы представляем сценарий многопотоковой склейки. Snapshot Query Основной процесс LogFile Выполните слияние дедупликации, а затем снова слейте его. BaseFile и После дедупликации LogFile данные в. На рисунке ниже показан весь процесс объединения данных, который можно разбить на следующие этапы. Два процесса:
• Merge LogFile
Текущая логика Hudi заключается в чтении данных в файле журнала и сохранении их в карте. Для каждой записи в файле журнала, если ключ не существует в карте, он напрямую помещается в карту, если ключ уже существует в карте. Карта, требуется операция обновления.
При многопотоковом сращивании, поскольку в LogFile есть данные, записанные разными потоками данных, то есть столбцы каждых данных могут быть разными, поэтому при обновлении необходимо определить, происходят ли две записи с одинаковым ключом из одного и того же ключа. поток. Если да, выполните Обновить, в противном случае выполните сращивание. Как показано на рисунке 3, при чтении записи, первичным ключом которой является ключ1 в файле журнала2, запись, соответствующая ключу1, уже существует в карте, но две записи происходят из разных потоков, поэтому их необходимо соединить, чтобы сформировать новый Запись (key1, b0_new, c0_new, d0_new) на карту.
Текущая логика Hudi по умолчанию заключается в проверке наличия записи с тем же ключом на карте для каждой записи, существующей в BaseFile. Если она существует, перезапишите запись в BaseFile записью в Map. При многопотоковой склейке Запись в Карте не будет полностью перекрывать соответствующую Запись в BaseFile, а может только обновлять значения некоторых столбцов, то есть столбцов, соответствующих Записи в Карте.
Как показано на рисунке ниже, на примере простейшей логики покрытия при чтении BaseFile Первичный ключ в key1 из Record когда, обнаружил key1 существовать Map Уже сохранены в существовании, соответствующем из Record иметь BCD Если есть три столбца значений, обновите BaseFile серединаиз BCD Колонка, получи новое из Запись(key1, b0_new, c0_new, d0_new, e0), примечание E Столбец не был обновлен, поэтому сохраняется исходное значение. е0. Для новых дополнений Key нравиться Key3 Переписка Запишите, вам нужно BCE Добавляются три столбца со значением по По умолчанию сформировать полное из Record。
Реализация принципа из в основном заключается в настройке Payload class добиться того же key Различные исходные данные из-за логики слияния, конец записи будет объединять несколько источников и записывать в пакете. log, сторона чтения также будет вызывать ту же логику для обработки ситуаций между пакетами при слиянии при чтении.
Здесь следует отметить, что из – данные вне порядка (out-of-order). and late events)извопрос。нравиться Если ты этого не сделаешьиметь дело с,существование ниже по течению часто приводит к появлению старых данных. Крышка новых данных.,Или обновление столбца неполное.
В случае неупорядоченных и запоздалых данных мы Hudi сделанный Multiple ordering value Улучшение, гарантирующее, что каждый источник может обновлять только свою часть данных столбца, может быть установлено в соответствии с event time (ordering value) столбец, гарантируя, что только новые данные перезаписывают старые данные. финальная комбинация lock less multiple writers чтобы достичь большего Job Многоисходный параллелизм писать.
Для этой функции была выпущена версия моментального снимка на основе 0.12.0-1-tencentiz.
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.0</version>
</dependency>
</dependencies>
Job1
CREATE TABLE sourceA (\n" +
uuid STRING,\n" +
name STRING,\n" +
_ts1 timestamp(3)\n" +
) WITH (\n" +
.....
)
public static String sinkTableDDL1() {
return String.format("create table %s(\n"
+ " uuid STRING,\n"
+ " name STRING,\n"
+ " age int,\n"
+ " _ts1 bigint,\n"
+ " _ts2 bigint,\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED"
+ ")\n"
+ " PARTITIONED BY (_ts1)\n"
+ " with (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '%s', -- Заменить на абсолютный путь\n"
+ " 'table.type' = 'MERGE_ON_READ',\n"
+ " 'write.bucket_assign.tasks' = '5',\n"
+ " 'write.tasks' = '5',\n"
+ " 'write.partition.format' = 'yyyyMMdd',\n"
+ " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"
+ " 'hoodie.bucket.index.num.buckets' = '5',\n"
+ " 'changelog.enabled' = 'true',\n"
+ " 'index.type' = 'BUCKET',\n"
+ " 'hoodie.bucket.index.num.buckets' = '5',\n"
+ String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age")
+ " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"
+ " 'hoodie.write.log.suffix' = 'job1',\n"
+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"
+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"
+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"
+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"
+ " 'hoodie.consistency.check.enabled' = 'false',\n"
+ " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"
+ " 'hoodie.write.lock.early.conflict.detection.strategy' = '"
+ SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"
+ " 'hoodie.keep.min.commits' = '1440',\n"
+ " 'hoodie.keep.max.commits' = '2880',\n"
+ " 'compaction.schedule.enabled'='false',\n"
+ " 'compaction.async.enabled'='false',\n"
+ " 'compaction.trigger.strategy'='num_or_time',\n"
+ " 'compaction.delta_commits' ='5',\n"
+ " 'compaction.delta_seconds' ='180',\n"
+ " 'compaction.max_memory' = '3096',\n"
+ " 'clean.async.enabled' ='false',\n"
+ " 'hive_sync.enable' = 'true',\n"
+ " 'hive_sync.mode' = 'hms',\n"
+ " 'hive_sync.db' = '%s',\n"
+ " 'hive_sync.table' = '%s',\n"
+ " 'hive_sync.metastore.uris' = '%s'\n"
+ ")", sinkAliasTable1, basePath, dbName, targetTable, metastoreUrl);
}
Потоковая запись данных
insert into %s(uuid, name, _ts1) select uuid, name, ts as _ts1 from sourceA
Job2
CREATE TABLE sourceB (\n" +
uuid varchar(20),\n" +
age int,\n" +
_ts2 timestamp(3)\n" +
) WITH (\n" +
.....
)
public static String sinkTableDDL2() {
return String.format("create table %s(\n"
+ " uuid STRING,\n"
+ " name STRING,\n"
+ " age int,\n"
+ " _ts1 bigint,\n"
+ " _ts2 bigint,\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED"
+ ")\n"
+ " PARTITIONED BY (_ts2)\n"
+ " with (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '%s', -- Заменить на абсолютный путь\n"
+ " 'table.type' = 'MERGE_ON_READ',\n"
+ " 'write.bucket_assign.tasks' = '5',\n"
+ " 'write.tasks' = '5',\n"
+ " 'write.partition.format' = 'yyyyMMdd',\n"
+ " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"
+ " 'changelog.enabled' = 'true',\n"
+ " 'index.type' = 'BUCKET',\n"
+ " 'hoodie.bucket.index.num.buckets' = '5',\n"
+ String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name;_ts2:age")
+ " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"
+ " 'hoodie.write.log.suffix' = 'job2',\n"
+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"
+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"
+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"
+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"
+ " 'hoodie.consistency.check.enabled' = 'false',\n"
+ " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"
+ " 'hoodie.write.lock.early.conflict.detection.strategy' = '"
+ SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"
+ " 'hoodie.keep.min.commits' = '1440',\n"
+ " 'hoodie.keep.max.commits' = '2880',\n"
+ " 'compaction.schedule.enabled'='true',\n"
+ " 'compaction.async.enabled'='true',\n"
+ " 'compaction.trigger.strategy'='num_or_time',\n"
+ " 'compaction.delta_commits' ='5',\n"
+ " 'compaction.delta_seconds' ='180',\n"
+ " 'compaction.max_memory' = '3096',\n"
+ " 'clean.async.enabled' ='false',\n"
+ " 'hive_sync.enable' = 'true',\n"
+ " 'hive_sync.mode' = 'hms',\n"
+ " 'hive_sync.db' = '%s',\n"
+ " 'hive_sync.table' = '%s',\n"
+ " 'hive_sync.metastore.uris' = '%s'\n"
+ ")", sinkAliasTable2, basePath, dbName, targetTable, metastoreUrl);
}
Запись данных в поток B
insert into %s(uuid, age, _ts2) select uuid, age, ts as _ts2 from sourceB
Имя параметра | Required | значение по умолчанию | Примечание |
---|---|---|---|
path | Required | N/A | целевая путь к таблице |
table.type | Optional | MERGE_ON_READ | Тип таблицы: COPY_ON_WRITE или MERGE_ON_READ. |
write.operation | Optional | upsert | Тип записи: UPSERT или INSERT. |
write.payload.class | Required | PartialUpdateAvroPayload | Укажите данные обработки изPayload, PartialUpdateAvroPayload.class.getName() |
write.partition.format | Optional | N/A | Формат раздела: гггг ММдд разбит по дням, гггг ММдд ЧЧ — по часам. |
write.partition.timestamp.type | Optional | N/A | Тип раздела, используется, когда поле раздела имеет значение bigint (длинное), значение: EPOCHMILLISCONDS. |
write.precombine | Required | ts | _ts1:имя;_ts2:возраст указывает, что _ts1 и _ts2 являются полями сортировки.,После двоеточия необходимо обновить поле из.,Точки с запятой используются для обозначения разных потоков. |
hoodie.write.log.suffix | Required | никто | Суффикс файла журнала используется для различения разных заданий. |
index.type | Required | FLINK_STATE | Установите здесь BUCKET |
hoodie.bucket.index.num.buckets | Required | 256 | Необходимо оценить на основе объема данных |
hoodie.write.concurrency.mode | Required | SINGLE_WRITER | Установите optimistic_concurrency_control. |
hoodie.cleaner.policy.failed.writes | Required | LAZY | Установите режим ЛЕНИВЫЙ |
hoodie.write.lock.early.conflict.detection.enable | Required | true | |
hoodie.write.lock.early.conflict.detection.strategy | Required | SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() |
проиллюстрировать: 1. По данным прекомбината Ключевое сравнение, следует ли обновлять данные, подходящие для входа в озеро в реальном времени, и порядок входа в озеро не в порядке. 2.нравиться Если исходная таблица пользователясередина Если значения поля времени одинаковы и не могут сравниваться,FIFO
из Последовательное сращивание и слияние。
select * from hudi_tauth_test.hudi_partial_01_rt limit 10;
Примечание: версия presto будет обновлена.
В конечном итоге, на основе Hudi Несколько Решение склейки потоков, помимо решения задачи в фоновом режиме, существует хранилище данных реального времени. DWS Реализация уровня, поддерживается одна таблица 3+ Поток данных при одновременном импорте, крышка сотен TB изданные. Кроме того, существование использует Spark При запросе данных широкой таблицы, поскольку данные были дедуплицированы, сжаты и объединены в большую широкую таблицу, объем одного сканирования составляет десятки TB В запросе производительность улучшена по сравнению с прямым использованием многотабличных корреляцийсуществовать 200% Выше существуют некоторые более сложные запросы, которые также имеют 40-140% из Улучшения производительности.
[1]
Oracle: https://www.oracle.com/database/
[2]
Voldemort: https://www.slideshare.net/vinothchandar/voldemort-prototype-to-production-nectar-edits
[3]
ksqlDB: https://www.confluent.io/blog/ksqldb-pull-queries-high-availability/
[4]
механизм: https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-transaction-model.html
[5]
Ключевые ограничения: https://dev.mysql.com/doc/refman/8.0/en/create-table-foreign-keys.html
[6]
индекс: https://dev.mysql.com/doc/refman/8.0/en/create-table-secondary-indexes.html
[7]
Обязательный: https://docs.teradata.com/r/a8IdS6iVHR77Z9RrIkmMGg/wFPZS4jwZgSG21GnOIpEsw
[8]
Правоприменение: https://docs.snowflake.com/en/sql-reference/constraints-overview.html#supported-constraint-types
[9]
Оптимистическое управление параллелизмом: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
[10]
Методы управления параллелизмом: https://en.wikipedia.org/wiki/Concurrency_control
[11]
Хронология: https://hudi.apache.org/docs/timeline
[12]
Uber: https://eng.uber.com/uber-big-data-platform/
[13]
Тег механизма: https://hudi.apache.org/blog/2021/08/18/improving-marker-mechanism/
[14]
Прервать досрочно: https://issues.apache.org/jira/browse/HUDI-1575
[15]
Доказано: https://hudi.apache.org/blog/2021/09/01/building-eb-level-data-lake-using-hudi-at-bytedance/#functionality-support
[16]
только Ключевые ограничения: https://hudi.apache.org/docs/key_generation
[17]
этот RFC: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers
[18]
Лейкхаус Управление параллелизма: Мы слишком оптимистичны?: https://hudi.apache.org/blog/2021/12/16/lakehouse-concurrency-control-are-we-too-optimistic/
[19]
Apache Hudi гибкий из механизма Payload: https://developer.aliyun.com/article/909719
[20]
Snapshot Isolation using Optimistic Concurrency Control for multi-writers: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers