Лучшие практики для входа в озеро в режиме реального времени из нескольких баз данных и нескольких таблиц на основе Apache Hudi
Лучшие практики для входа в озеро в режиме реального времени из нескольких баз данных и нескольких таблиц на основе Apache Hudi

1. Предисловие

CDC(Change Data Захват) В общих чертах все технологии, которые могут фиксировать данные об изменениях, можно назвать CDC, но определение CDC в этой статье ограничивается сбором данных об изменениях в базе данных в режиме реального времени неинвазивным способом. Например: собирайте данные об изменениях, анализируя журнал Binlog базы данных MySQL, а не через SQL. В исходной таблице запроса фиксируются измененные данные. Худи Являясь одной из самых популярных инфраструктур технологий озер данных, Для создания озер потоковых данных с конвейерами инкрементной обработки данных. Его основные возможности включают быстрое обновление и удаление данных на уровне строк в объектном хранилище, инкрементальные запросы (инкрементальные запросы). queries,Time путешествия), управление небольшими файлами и оптимизация запросов (кластеризация, сжатие, встроенные метаданные), ACID и поддержка одновременной записи. Hudi не является сервером. Он не хранит данные и не обеспечивает вычислительную мощность. Данные хранятся в S3 (также поддерживаются другие объектные хранилища и HDFS), а Hudi определяет формат, в котором данные хранятся в S3 (Parquet, Avro,...), Как организовать данные так, чтобы их можно было принимать в режиме реального времени, поддерживая обновление, удаление, ACID и другие функции. Hudi обеспечивает запись данных через вычислительные механизмы Spark и Flink. Вычислительная мощность, но также обеспечивает возможность интеграции с механизмом OLAP, чтобы механизм OLAP мог запрашивать таблицы Hudi. С точки зрения использования Hudi представляет собой JAR-пакет. Просто возьмите с собой этот JAR-пакет при работе с Flink. Амазонка EMR Искра, Флинк, Престо включены , Trino изначально интегрирует Hudi, Более того, среда выполнения EMR более чем в 2 раза повышает производительность на движках Spark и Presto по сравнению с средой с открытым исходным кодом. В сценарии с несколькими базами данных и несколькими таблицами (например: сотни уровней таблиц базы данных), когда нам нужно преобразовать базу данных(mysql,postgres,sqlserver,oracle,mongodbждать)Данные вCDCна минутном уровне(1minute+)Задержка письмаHudi,И постройте иерархию хранилища данных с помощью инкрементных запросов.,При выполнении эффективных запросов и анализа данных в режиме реального времени. Нам предстоит решить три задачи,Первый,Как использовать унифицированный код для записи данных CDC в сотни таблиц базы данных параллельно с Hudi,Сократите затраты на разработку и обслуживание. второй,Как синхронизировать изменения исходной схемы с таблицами Hudi. третий,использоватьHudiНапример, инкрементальный запрос строит иерархию хранилища данных.ODS->DWD->DWS(Все слоиHudiповерхность),Как реализовать инкрементную агрегацию на уровне DWS. Решения, рекомендуемые в этой статье: Использование Флинка CDC DataStream API (не SQL) сначала записывает данные CDC в Kafka, а не напрямую через Flink. Основные причины записи SQL в таблицы Hudi заключаются в следующем. Во-первых, в сценариях с несколькими таблицами базы данных и различными схемами использование SQL приведет к созданию нескольких потоков синхронизации CDC на исходной стороне, оказывая давление на исходную сторону и влияя на производительность синхронизации. Во-вторых, без MSK в качестве уровня развязки и буферизации данных между восходящим и нисходящим потоком данных CDC, нисходящее многотерминальное потребление и обратное отслеживание данных становятся более трудными. После записи данных CDC в MSK рекомендуется использовать Spark. Structured Streaming DataFrame API или Флинк StatementSet Инкапсулирует логику записи таблиц с несколькими базами данных, но если вам нужно, чтобы изменения схемы на стороне источника автоматически синхронизировались с таблицей Hudi, используйте Spark. Structured Streaming DataFrame Реализация API проще. Флинка требует доработки на базе HoodieFlinkStreamer. Инкрементный ETL Hudi можно использовать через Flink в сценариях, где уровень DWS требует агрегации данных. Streaming Read рассматривает Hudi как неограниченный поток и использует вычислительный механизм Flink для выполнения агрегированного расчета данных в реальном времени и записи их в таблицу Hudi.

2. Проектирование и анализ архитектуры.

2.1 Данные CDC записываются в MSK в режиме реального времени.

Цифры 1 и 2 на рисунке предназначены для отправки данных из базы данных в MSK (сервис Kafka, размещенный на Amazon) в режиме реального времени через CDC. flink-cdc-connectors[1] — популярный инструмент CDC с открытым исходным кодом. Он имеет встроенный движок debezium[2] и поддерживает несколько источников данных. Для MySQL он поддерживает пакетный параллелизм (фаза полной синхронизации), безблокировку и контрольную точку (может восстанавливаться из положения сбоя без повторного чтения и). удобен для больших столов). Поддерживает Flink SQL API и DataStream API. Здесь следует отметить, что если вы используете SQL API, для каждой таблицы в библиотеке будет создана отдельная ссылка, и независимый поток будет выполнять дамп binlog. Если существует много таблиц, которые необходимо синхронизировать, это создаст большую нагрузку на исходную сторону. В сценариях, когда необходимо синхронизировать большое количество таблиц по всей базе данных, следует использовать API DataStream для написания кода, создающего только один дамп бинлога для синхронизации всех необходимых таблиц базы данных. Другой сценарий — синхронизировать только данные подбаз данных и таблиц. Например, если пользовательская таблица разделена на базы данных и таблицы, схема таблицы будет одинаковой. В этом случае SQL API Flink CDC поддерживает регулярное сопоставление нескольких таблиц базы данных. В этом случае использование синхронизации SQL API все равно создаст только поток дампа binlog. Следует отметить, что Flink CDC может напрямую передавать данные в Hudi без необходимости использования MSK. Однако, учитывая разделение восходящего и нисходящего потоков, обратное отслеживание данных, конечное потребление нескольких предприятий, а также управление и обслуживание нескольких таблиц, это по-прежнему рекомендуется. что данные CDC сначала поступают в MSK. Затем нисходящий поток получает данные от MSK и записывает их в Hudi.

2.2 Сравнение инструментов CDC

Под номером 3 на рисунке, помимо соединителей flink-cdc, DMS (Amazon Database Migration Services) — это служба миграции данных, размещенная на Amazon, обеспечивающая CDC нескольких источников данных (mysql, oracle, sqlserver, postgres, mongodb, documentdb, и т. д.) Поддерживает визуальную настройку задач CDC, работу, управление и мониторинг. Таким образом, вы можете выбрать DMS в качестве инструмента анализа CDC. DMS поддерживает MSK или самостоятельно созданный Kafka в качестве цели доставки данных, поэтому CDC можно синхронизировать с MSK в режиме реального времени через DMS для быстрой визуализации управления конфигурацией. Конечно, помимо DMS, существует множество инструментов CDC с открытым исходным кодом, которые также могут выполнять работу по синхронизации CDC, но соответствующие сервисы необходимо создавать на EC2. На рисунке ниже перечислены элементы сравнения инструментов CDC для справки.

2.3 Структурированная потоковая передача Spark записывает изменения Hudi и Schema параллельно в несколько таблиц базы данных.

Номер 4 на рисунке: после того, как данные CDC достигают MSK, данные о потреблении могут быть записаны в таблицу Hudi через вычислительный механизм Spark/Flink. Мы называем этот уровень уровнем ODS. И Spark, и Flink могут обеспечить передачу данных на уровень ODS. Нам нужно решить, какой из них использовать. Вот несколько относительно важных моментов. Прежде всего, для движка Spark мы должны использовать структурированную потоковую передачу Spark для использования MSK и записи в Hudi. Поскольку API DataFrame можно использовать для записи в Hudi, использовать тему CDC в Spark легко и в соответствии с метаинформацией. поле в каждом фрагменте данных (имя базы данных, имя таблицы и т. д.) для записи разных таблиц Hudi в рамках одного задания, инкапсулируя логику параллельной записи нескольких таблиц, а одно задание может реализовать логику синхронизации нескольких таблиц во всей базе данных . Скриншот примера кода выглядит следующим образом. Полный код можно получить на Github[3]

Мы знаем, что данные CDC содержат информацию I (вставка), U (обновление) и D (удаление). Различные инструменты CDC имеют разные форматы данных, но смысл, который необходимо выразить, один и тот же. При использовании Spark для записи в Hudi мы в основном фокусируемся на информации U и D. Данные с информацией U указывают, что данные представляют собой операцию обновления. Для Hudi просто установите первичный ключ исходной таблицы на RecordKey Hudi и установите значение. precombineKey в соответствии со сценарием спроса. Вот объяснение precombineKey. Это означает, что когда данные необходимо обновить (recordKey тот же), больший precombineKey из двух данных выбирается по умолчанию и сохраняется в Hudi. Фактически, Hudi имеет очень гибкий механизм полезной нагрузки. Вы можете выбирать различные реализации полезной нагрузки с помощью параметра hoodie.datasource.write.payload.class, например реализацию частичного обновления (частичное обновление) OverwriteNonDefaultsWithLatestAvroPayload, или вы можете настроить полезную нагрузку. Класс реализации. Его ядро. Вам нужно обновить данные на основе полей, указанных в precombineKey. Таким образом, для приемника данных CDC Hudi нам необходимо обеспечить порядок восходящих сообщений. Пока в нашей таблице есть поле, которое может определить, какие данные являются самыми последними, это поле часто проектируется как время обновления данных, изменение_времени в. Метка времени MySQL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP . Если аналогичного поля нет, рекомендуется определить спецификацию проекта и добавить это поле. В противном случае данные должны храниться в порядке (что приведет к большему ухудшению архитектуры и производительности). Обновления в Hudi могут быть неверными. Для данных с информацией D это означает, что данные были удалены в источнике. Hudi предоставляет возможность удаления. Если часть данных содержит поле _hoodie_is_deleted и значение true, Hudi автоматически удалит эту часть. данных легко реализовать в коде структурированной потоковой передачи Spark. Вам нужно только добавить поле в реализацию операции сопоставления и установить значение поля равным true, если данные содержат информацию D.

2.4 Таблица Flink StatementSet из нескольких баз данных CDC параллельно записывает Hudi

для Использование Механизм Флинка потребляет данные CDC в MSK и помещает их в таблицу Hudi уровня ODS. Если вы хотите синхронизировать несколько таблиц во всей базе данных в одном задании, Flink. StatementSet реализуется через Kafka CDC. Исходная таблица: выберите библиотечную таблицу Sink to Hudi на основе метаинформации. Но здесь следует отметить, что благодаря интеграции Flink и Hudi таблица сначала создается в SQL, а затем выполняется оператор Insert для записи в таблицу, если есть сотни таблиц, которые необходимо синхронизировать. , инкапсуляция автоматизированной логики может облегчить проблему. По мере нашей работы вы найдете способ записи в Hudi с помощью SQL, хотя и для записи одной таблицы. Это очень удобно в использовании. Вам нужно только написать SQL без программирования. Однако это также накладывает некоторые ограничения. Поскольку таблица сначала создается с помощью SQL при записи в Hudi, схема определяется при ее создании. Изменения исходной схемы. Трудно автоматически изменить схему нижестоящей таблицы Hudi через SQL. Хотя Flink не представлен на официальном сайте Худи. DataStream Пример записи API в Hudi, но Flink может писать в Hudi через HoodieFlinkStreamer как DataStream Реализацию API можно найти в исходном коде Hudi [4]. Поэтому, если вы хотите добиться более гибкой и простой синхронизации нескольких таблиц и автоматического изменения схемы, вам необходимо обратиться к коду HoodieFlinkStreamer для использования DataStream. Напишите Hudi через API. Для информации I, U, D дебезий Флинка ,maxwell,canal format будет напрямую анализировать сообщение Другими словами, Flink напрямую анализирует операции I, U и D во внутреннюю структуру данных Flink RowData и напрямую помещает их в таблицу Hudi. Нам также необходимо установить RecordKey и precombineKey в SQL. ну, полезную нагрузку можно установить. Различные классы реализации класса.

2.5 Режим потокового чтения Flink считывает Hudi для реализации агрегации уровня ODS

На рисунке он обозначен цифрой 5. После того, как данные передаются на уровень ODS через Spark/Flink, нам может потребоваться создать слои DWD и DWS для дальнейшей обработки данных (DWD и DWS не обязательны. В зависимости от вашего сценария). вы можете напрямую позволить механизму OLAP запрашивать таблицу Hudi уровня ODS). Мы надеемся использовать возможность инкрементальных запросов Hudi только для запроса измененных данных для последующего ETL DWD и DWS, что может ускорить построение и снизить потребление ресурсов. Для механизма Spark, если вы выполняете только операции сопоставления, фильтрации и другие связанные типы операций с данными в слое DWD, вы можете использовать инкрементные запросы. Однако, если слой DWD создан с помощью операции соединения, его невозможно реализовать с помощью операции соединения. инкрементальные запросы. Вы можете сканировать только всю таблицу (или раздел). При построении уровня DWS, если операции типа агрегации не дедуплицированы, а операции типа окна представляют собой только операции SUM, AVG, MIN, MAX и другие типы, они могут быть реализованы путем слияния с целевой таблицей после инкрементного запроса. Напротив, можно реализовать сканирование только всей таблицы (или раздела). Чтобы механизм Flink мог создавать DWD и DWS, поскольку Flink поддерживает потоковое чтение таблиц Hudi, просто установите read.streaming.enabled=true, Changelog.enabled=true и другие связанные параметры потокового чтения в SQL. После настройки Flink рассматривает таблицу Hudi как таблицу неограниченного потока журнала изменений. Независимо от того, как выполняется ETL, Flink сам сохраняет информацию о состоянии, и вся ссылка ETL передается в потоковом режиме.

2.6 Механизм OLAP запрашивает таблицу Hudi

На картинке цифра 6. EMR Hive/Presto/Trino Все таблицы Hudi можно запрашивать, но следует отметить, что разные движки имеют разную поддержку запросов. См. официальный сайт [5]. По поводу автоматической смены Схемы, во-первых, сам Худи поддерживает Схему. Evolution[6], мы хотим автоматически синхронизировать изменения исходной схемы с таблицей Hudi. Из приведенного выше описания вы можете знать, что если вы используете движок Spark, вы можете использовать DataFrame. API обрабатывает данные и динамически генерирует DataFrame через from_json, поэтому может быть удобнее автоматически добавлять столбцы. Если использование Движок Флинка уже объяснил выше, что он хочет автоматически реализовывать изменения Схемы через HoodieFlinkStreamer и DataStream. Метод API реализует написание Hudi и интегрирует логику изменения схемы.

3. Демонстрация синхронизации всей базы данных EMR CDC

RDS будет выбран при следующей демонстрационной операции. MySQL как источник данных, Flink CDC DataStream API Синхронизируйте все таблицы в библиотеке с Kafka, используйте механизм Spark для использования данных binlog в Kafka и записывайте несколько таблиц на уровень ODS. Hudi, Использование Флинка движок со стримингом В режиме чтения выполняется построение таблицы Hudi для слоев DWD и DWS.

3.1 Экологическая информация

Язык кода:javascript
копировать
EMR 6.6.0 
Hudi 0.10.0 
Spark 3.2.0 
Flink 1.14.2  
Presto 0.267
MySQL 5.7.34

3.2 Создать исходную таблицу

Создайте библиотеку test_db и три таблицы user, Product и user_order в MySQL, вставьте образцы данных, а затем CDC сначала загрузит существующие данные в таблицу, затем добавит новые данные в источник и измените структуру таблицы, чтобы добавить новые поля. и убедитесь, что изменения схемы автоматически синхронизируются с таблицей Hudi.

Язык кода:javascript
копировать
-- create databases
create database if not exists test_db default character set utf8mb4 collate utf8mb4_general_ci;
use test_db;

-- create  user table
drop table if exists user;
create table if not exists user
(
    id           int auto_increment primary key,
    name         varchar(155)                        null,
    device_model varchar(155)                        null,
    email        varchar(50)                         null,
    phone        varchar(50)                         null,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user(name,device_model,email,phone) values
('customer-01','dm-01','abc01@email.com','188776xxxxx'),
('customer-02','dm-02','abc02@email.com','166776xxxxx');

-- create product table
drop table if exists product;
create table if not exists product
(
    pid          int not null primary key,
    pname        varchar(155)                        null,
    pprice       decimal(10,2)                           ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into product(pid,pname,pprice) values
('1','prodcut-001',125.12),
('2','prodcut-002',225.31);

-- create order table
drop table if exists user_order;
create table if not exists user_order
(
    id           int auto_increment primary key,
    oid          varchar(155)                        not null,
    uid          int                                         ,
    pid          int                                         ,
    onum         int                                         ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user_order(oid,uid,pid,onum) values 
('o10001',1,1,100),
('o10002',1,2,30),
('o10001',2,1,22),
('o10002',2,2,16);

-- select data
select * from user;
select * from product;
select * from user_order;

3.3 Flink CDC отправляет данные в Kafka

Используйте DataStream API для написания программы синхронизации CDC. Пример кодаGithub[7]

Язык кода:javascript
копировать
# Создать тему
kafka-topics.sh --create --zookeeper ${zk}  --replication-factor 2 --partitions 8  --topic cdc_topic
# Загрузите код, скомпилируйте и упакуйте.
mvn clean package  -Dscope.type=provided  -DskipTests
# Вы также можете использовать упакованный пакет, войти в главный узел EMR и выполнить команду
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-flink-cdc-1.0-SNAPSHOT.jar
# disalbe check-leaked-classloader
sudo sed -i -e '$a\classloader.check-leaked-classloader: false' /etc/flink/conf/flink-conf.yaml
# Начать мигать cdc Отправить данные в Кафку
sudo flink run -m yarn-cluster \
-yjm 1024 -ytm 2048 -d \
-ys 4 -p 8 \
-c  com.aws.analytics.MySQLCDC  \
/home/hadoop/emr-flink-cdc-1.0-SNAPSHOT.jar \
-b xxxxx.amazonaws.com:9092 \
-t cdc_topic_001 \
-c s3://xxxxx/flink/checkpoint/ \
-l 30 -h xxxxx.rds.amazonaws.com:3306 -u admin \
-P admin123456 \
-d test_db -T test_db.* \
-p 4 \
-e 5400-5408
# Соответствующие параметры описаны следующим образом.
MySQLCDC 1.0
Usage: MySQLCDC [options]

  -c, --checkpointDir <value>
                           checkpoint dir
  -l, --checkpointInterval <value>
                           checkpoint interval: default 60 seconds
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sinkTopic <value>  kafka topic
  -h, --host <value>       mysql hostname, eg. localhost:3306
  -u, --username <value>   mysql username
  -P, --pwd <value>        mysql password
  -d, --dbList <value>     cdc database list: db1,db2,..,dbn
  -T, --tbList <value>     cdc table list: db1.*,db2.*,db3.tb*...,dbn.*
  -p, --parallel <value>   cdc source parallel
  -s, --position <value>   cdc start position: initial or latest,default: initial
  -e, --serverId <value>   cdc server id
  
# Потребляйте Кафку topic Данные наблюдений
./kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server $brok --topic cdc_topic_001 --from-beginning |jq .

3.4 Spark использует данные CDC и синхронизирует всю базу данных

Язык кода:javascript
копировать
# Пример кода синхронизации всей библиотеки  https://github.com/yhyyz/emr-hudi-example/blob/main/src/main/scala/com/aws/analytics/Debezium2Hudi.scala

# Загрузите код, скомпилируйте и упакуйте.
mvn clean package  -Dscope.type=provided  -DskipTests
# Вы также можете использовать упакованный пакет, войти в главный узел EMR и выполнить команду
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar 

# Выполните следующую команду, чтобы отправить задание, установите -s в команде HMS, синхронизация таблицы Hudi с Glue Catalog
spark-submit  --master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--num-executors  2 \
--conf "spark.dynamicAllocation.enabled=false" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars  /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
--class com.aws.analytics.Debezium2Hudi /home/hadoop/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar \
-e prod -b xxxxx.amazonaws.com:9092 \
-t cdc_topic_001 -p emr-cdc-group-02 -s true \
-o earliest \
-i 60 -y cow -p 10 \
-c s3://xxxxx/spark-checkpoint/emr-hudi-cdc-005/ \
-g s3://xxxxx/emr-hudi-cdc-005/ \
-r jdbc:hive2://localhost:10000  \
-n hadoop -w upsert  \
-s hms \
--concurrent false \
-m "{\"tableInfo\":[{\"database\":\"test_db\",\"table\":\"user\",\"recordKey\":\"id\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"},
{\"database\":\"test_db\",\"table\":\"user_order\",\"recordKey\":\"id\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"},{\"database\":\"test_db\",\"table\":\"product\",\"recordKey\":\"pid\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"}]}"

# Соответствующие параметры объясняются следующим образом:
Debezium2Hudi 1.0
Usage: spark ss Debezium2Hudi [options]

  -e, --env <value>        env: dev or prod
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sourceTopic <value>
                           kafka topic
  -p, --consumeGroup <value>
                           kafka consumer group
  -s, --syncHive <value>   whether sync hive,default:false
  -o, --startPos <value>   kafka start pos latest or earliest,default latest
  -m, --tableInfoJson <value>
                           table info json str
  -i, --trigger <value>    default 300 second,streaming trigger interval
  -c, --checkpointDir <value>
                           hdfs dir which used to save checkpoint
  -g, --hudiEventBasePath <value>
                           hudi event table hdfs base path
  -y, --tableType <value>  hudi table type MOR or COW. default COW
  -t, --morCompact <value>
                           mor inline compact,default:true
  -m, --inlineMax <value>  inline max compact,default:20
  -r, --syncJDBCUrl <value>
                           hive server2 jdbc, eg. jdbc:hive2://localhost:10000
  -n, --syncJDBCUsername <value>
                           hive server2 jdbc username, default: hive
  -p, --partitionNum <value>
                           repartition num,default 16
  -w, --hudiWriteOperation <value>
                           hudi write operation,default insert
  -u, --concurrent <value>
                           write multiple hudi table concurrent,default false
  -s, --syncMode <value>   sync mode,default jdbc, glue catalog set dms
  -z, --syncMetastore <value>
                           hive metastore uri,default thrift://localhost:9083
                           
# Как вы можете видеть на рисунке ниже, стол синхронизирован с Glue. Catalog , данные записаны в S3
Язык кода:javascript
копировать
-- Добавьте столбец в таблицу пользователей MySQL и вставьте новый фрагмент данных. Запросите таблицу hudi, и вы увидите, что новые столбцы и данные были автоматически синхронизированы с пользовательской таблицей. Обратите внимание, что следующий SQL-код выполняется на стороне MySQL.
alter table user add column age int
insert into user(name,device_model,email,phone,age) values
('customer-03','dm-03','abc03@email.com','199776xxxxx',18);

3.5 Flink Streaming Чтение агрегации в реальном времени

Язык кода:javascript
копировать
# Обратите внимание на последний параметр, -t Это /etc/hive/conf/hive-site.xml? Добавьте его в путь к классам, чтобы при синхронизации таблицы выполнения Hudi с Glue ее можно было добавить и загрузить в эту конфигурацию. Ключ к конфигурации: hive.metastore.client.factory.class = com.amazonaws.glue.catalog.metastore.AWS GlueDataCatalogHiveClientFactory, чтобы можно было загрузить реализацию каталога с использованием Glue. Если при запуске кластера EMR выбран параметр «Клей» Metastore, файл /etc/hive/conf/hive-site.xml AWSGlueDataCatalogHiveClientFactory настроен. Если вы запустите EMR, не выбрав «Клей» Metastore также необходимо синхронизировать данные с Glue, который необходимо добавить вручную.

# Обратите внимание, чтобы заменить его на свой S3. Bucket
checkpoints=s3://xxxxx/flink/checkpoints/datagen/

flink-yarn-session -jm 1024 -tm 4096 -s 2  \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=5000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-d \
-t /etc/hive/conf/hive-site.xml 

# Запустить Флинк sql client
/usr/lib/flink/bin/sql-client.sh embedded -j /usr/lib/hudi/hudi-flink-bundle.jar shell
-- таблица пользователей, включить потоковую передачу read, changelog.enalbe=true
set sql-client.execution.result-mode=tableau;

CREATE TABLE `user`(
    id string,
    name STRING,
    device_model STRING,
    email STRING,
    phone STRING,
    age string,
    create_time STRING,
    modify_time STRING,
    year_month STRING
)
PARTITIONED BY (`year_month`)
WITH (
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-hudi-cdc-005/test_db/user/',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'table.type' = 'COPY_ON_WRITE',
  'index.bootstrap.enabled' = 'true',
  'read.streaming.enabled' = 'true',
  'read.start-commit' = '20220607014223',
  'changelog.enabled' = 'false',
  'read.streaming.check-interval' = '1'
);

# Запрос данных в режиме реального времени
select * from `user`;

# Измените имя id=3 в таблице пользователей на new-customer-03 в MySQL. Обратите внимание, что следующий SQL-код выполняется на стороне MySQL.
update  user set name="new-customer-03" where id=3;

# Во Флинке Вы можете увидеть изменения данных на клиенте
Язык кода:javascript
копировать
-- Операция агрегации Flink Sink в таблицу Hudi

-- batch
CREATE TABLE  user_agg(
num BIGINT,
device_model STRING
)WITH(
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-cdc-hudi/user_agg/',
  'table.type' = 'COPY_ON_WRITE',  
  'write.precombine.field' = 'device_model',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field' = 'device_model',
  'hive_sync.database' = 'dws',
  'hive_sync.enable' = 'true',
  'hive_sync.table' = 'user_agg',
  'hive_sync.mode' = 'HMS',
  'hive_sync.use_jdbc' = 'false',
  'hive_sync.username' = 'hadoop'
);

4. Резюме

В этой статье объясняется, как внедрить данные CDC в озеро и автоматически изменять схему через EMR. Через Флинк CDC DataStream API сначала отправляет все данные базы данных в MSK. В настоящее время CDC имеет только один binlog на исходной стороне. Дамп тредов, чтобы снизить нагрузку на источник. Используйте Искру Structured Streaming Динамически анализируйте данные и записывайте их в таблицу Hudi, чтобы реализовать автоматические изменения в схеме и позволить одному заданию управлять несколькими приемниками таблицы. Сократите затраты на разработку и обслуживание в случае нескольких таблиц. Несколько таблиц Hudi можно записывать параллельно или последовательно, а метаданные синхронизируются с Glue. Catalog。Использование Флинка Стриминг Худи Read Этот режим реализует ETL данных в реальном времени и отвечает потребностям соединения и агрегации слоев DWD и DWS в реальном времени. Амазонка Hudi изначально интегрирован в среду EMR. Используйте Amazon EMR легко создает демо-версию, которая синхронизирует всю библиотеку.

Справочная ссылка

[1] flink-cdc-connectors: https://github.com/ververica/flink-cdc-connectors [2] debezium: https://debezium.io/documentation/ [3] Github: https://github.com/yhyyz/emr-hudi-example/blob/main/src/main/scala/com/aws/analytics/Canal2Hudi.scala [4] Исходный код Худи: https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java [5] Смотрите официальный сайт: https://hudi.apache.org/docs/querying_data#merge-on-read-tables-1 [6] Schema Evolution: https://hudi.apache.org/docs/schema_evolution [7] Пример кода Github: https://github.com/yhyyz/emr-flink-cdc/blob/main/src/main/scala/com/aws/analytics/MySQLCDC.scala

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose