краткое содержание:В этой статье объясняется, какделатьиспользовать Dinky Создание вычислительной платформы реального времени Flink CDC Весь склад помещен в склад и в озеро. Содержание включает в себя:
Tips:Исторический портал~
《Dinky Расширять Phoenix Совместное использование соединителя》
《Dinky 0.6.1 Опубликовано, оптимизировано Flink Опыт применения》
《DinkyсуществоватьKubernetesизупражнятьсяделиться》
《Dlink существовать FinkCDC Поток в озеро Hudi изупражнятьсяделиться》
Адрес GitHub
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
Приглашаем всех обратить внимание на развитие Динки~
1. Предыстория
У Чонг (Юньси),Сюй Банцзян (истощенный снег) учителясуществовать Flink Forward Asia 2021 Поделился замечательным "Flink CDC Как упростить хранение и хранение данных в режиме реального времени», что приводит к созданию нового хранилища данных и архитектуры хранилища. Глава 4 Flink CDC Усовершенствования в сфере упражнения Алибабы привносят передовое мышление с упражнением, чье CDAS、CTAS Возможности синтаксиса синхронизации данных впечатляют. В последнее время цель состоит в том, чтобы стать FlinkSQL лучший партнер Dinky Также принес FlinkCDC Давайте попробуем улучшить практику складирования всего склада в озере~
2. Болевые точки
Болевые точки озерного хранилища данных Flink CDC суммированы в статье «Как Flink CDC упрощает хранение данных в озере в реальном времени» в виде следующих четырех пунктов:
Архитектура CDC-in-the-lake использует преимущества собственных возможностей обновления Hudi и может обеспечить полное инкрементное переключение, указав точную инкрементальную начальную точку посредством ручного вмешательства, но существует риск потери данных.
2. Структура таблицы сопоставления вручную подвержена ошибкам.
При построении задач синхронизации через FlinkCDC необходимо вручную сопоставлять структуры таблиц, такие как Mysql, с Flink DDL. Когда количество таблиц и полей очень велико, стоимость разработки и обслуживания будет увеличиваться линейно. А сопоставление типов полей вручную может привести к ошибкам.
3.Schema Изменения затрудняют поддержание связи с озером.
Изменения структуры таблицы происходят часто, но это будет сохраняться. FlinkCDC Задача теряет данные и даже приводит к зависанию связи с озером.
4. Опустить весь склад в озеро.
Весь водоем, входящий в озеро, в настоящее время является горячей темой. FlinkCDC В этом процессе будет много проблем, таких как необходимость определения большого количества DDL и написать большое количество INSERT што, что более серьезно, так это то, что он будет занимать большое количество подключений к базе данных. Mysql и сетевой стресс.
3. Решение
Alibaba создала решение «полностью автоматизированной интеграции данных» на основе Flink:
4. Исследования и практика Динки
Dinky полностью поддерживает последнюю версию Flink CDC. Flink CDC обновлен до 2.2.1. Начиная с версии 2.+, функции Flink CDC стали более стабильными и улучшенными. Подробности см.
https://github.com/ververica/flink-cdc-connectors
Среди них последняя версия Flink CDC уже имеет функции полного инкрементального автоматического переключения и синхронизации изменений схемы.
Dinky определяет синтаксис синхронизации CDCSOURCE для всей базы данных. Этот синтаксис аналогичен CDAS. Он может напрямую и автоматически создавать задачу хранения всей базы данных в озере в реальном времени и объединять источники без создания дополнительной нагрузки на MySQL и сети. Синхронизация любых стоков, таких как kafka, doris, hudi, jdbc и т.д.
5. Принцип CDCSOURCE
Если установлено слишком много подключений к базе данных, повторное чтение Binlog приведет к огромной нагрузке на исходную базу данных. В приведенном выше совместном использовании используется оптимизация слияния источников. Попробуйте объединить источники в одном задании. тот же источник данных, он будет объединен с исходным узлом.
Dinky использует только один источник, а затем выполняет обработку разгрузки на основе схемы, базы данных и таблицы и помещает их в соответствующие таблицы соответственно. На рисунке ниже показан весь водоем, входящий в озеро Худи (две таблицы).
2. Сопоставление метаданных
В приведенном выше совместном использовании MysqlCatalog используется для получения таблиц и схемы исходной базы данных для сопоставления Flink DDL.
Dinky собирает информацию метаданных исходной библиотеки с помощью функции метаданных своего собственного центра источников данных и синхронизирует FlinkDDL, используемый для построения потока данных или tableAPI на этапе приемника.
3. Несколько методов приемника
Вышеупомянутое совместное использование относится к худи.
Dinky Предоставляет разнообразные sink образом, изменяя параметры оператора, разные sink Способ. Динки поддержка через DataStream Приходите в Расширять новые раковина, вы также можете использовать FlinkSQL никто Вам необходимо напрямую изменить код Расширятьnew sink。
6. Использование CDCSOURCE
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
-- 'database-name'='test',
'table-name' = 'test\.student,test\.score',
-- 'sink.connector'='datastream-doris',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.enable-delete' = 'true'
)
Что следует отметить:
2. Элементы конфигурации
Элементы конфигурации | Это необходимо | значение по умолчанию | иллюстрировать |
---|---|---|---|
connector | да | никто | То же, что Flink CDC |
hostname | да | никто | То же, что Flink CDC |
port | да | никто | То же, что Flink CDC |
username | да | никто | То же, что Flink CDC |
password | да | никто | То же, что Flink CDC |
scan.startup.mode | нет | latest-offset | То же, что Flink CDC |
database-name | нет | никто | Поддержка регулярная |
schema-name | нет | никто | Поддержка регулярная |
table-name | нет | никто | Поддержка регулярная, если при выполнении используется форма имя библиотеки.имя таблицы, необходимо использовать \. |
source.* | нет | никто | Укажите персонализированную конфигурацию CDC, например source.server-time-zone, который является параметром конфигурации server-time-zone. |
checkpoint | нет | никто | Единица мс |
parallelism | нет | никто | Параллелизм задач |
sink.connector | да | никто | Укажите тип приемника, например datastream-kafka, datastream-doris, datastream-hudi, kafka, doris, hudi, jdbc и т. д. Тот, который начинается с datastream — это реализация DataStream. |
sink.sink.db | нет | никто | Имя библиотеки целевого источника данных. Если не указано, по умолчанию будет использоваться имя библиотеки исходного источника данных. |
sink.table.prefix | нет | никто | Префикс имени целевой таблицы, например ODS_, что означает, что ODS_ вставляется перед всеми именами таблиц. |
sink.table.suffix | нет | никто | Суффикс имени целевой таблицы. |
sink.table.upper | нет | никто | Имя целевой таблицы должно состоять из заглавных букв. |
sink.table.lower | нет | никто | Имя целевой таблицы записывается строчными буквами. |
sink.* | нет | никто | целевой источник данныхиз Информация о конфигурации,такой же FlinkSQL с использованием ${schemaName} и ${tableName} Имена обработанных исходных таблиц могут быть внедрены |
7. Практика CDCSOURCE
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='127.0.0.1:9092'
)
2. Данные в режиме реального времени же Шаг к переписке kafka topic
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.brokers'='127.0.0.1:9092'
)
3. Хранилище данных DataStream в режиме реального времени Doris
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'datastream-doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.sink.enable-delete' = 'true'
)
4. Хранилище данных в реальном времени в FlinkSQL Doris
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.enable-delete' = 'true'
)
5. Данные в реальном времени в озеро Худи.
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'source.server-time-zone' = 'UTC',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='data_deal',
'table-name'='data_deal\.stu,data_deal\.stu_copy1',
'sink.connector'='hudi',
'sink.path'='hdfs://cluster1/tmp/flink/cdcdata/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='id',
'sink.hoodie.parquet.max.file.size'='268435456',
'sink.write.precombine.field'='update_time',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.db'='cdc_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true',
'sink.hive_sync.metastore.uris'='thrift://cdh.com:9083',
'sink.hive_sync.username'='flinkcdc'
)
8. Резюме
Dinky существовать Flink CDC Практика хранения всей базы данных в озере решает три упомянутые выше проблемы: проблему полного инкрементного переключения, подверженную ошибкам структуру таблицы сопоставления вручную и хранение всей базы данных в озере. Schema Изменения затрудняют поддержание связи с озером.Не решено,Приветствуем дальнейшее обсуждение。
также Dinky Он также поддерживает всю библиотеку же Шагразличные источники данныхиз Раковина позволяет пользователям выполнять различные задачи по входу в озеро и складированию.
В этой статье не обсуждаются детали реализации исходного кода. Принцип реализации теоретически может быть внедрен. FlinkSQL Процесс обработки, его выполнение может существовать Войти на Обработка данных выполняется, когда склад входит в озеро, добро пожаловать на исследование.
Наконец мы можем найти Dinky По сравнению с другими проектами с открытым исходным кодом, он больше фокусируется на Flink из Опыт Применение улучшено, а также основано на принципе его проектирования, который позволяет легче расширять различные функции уровня предприятия, такие как пользовательская грамматика, вход в озеро Войти на склад、Сохранение каталога、Родствоиспользоватьждать。так,Столкнулся с таким замечательным проектом с открытым исходным кодом,Чего вы ждете?,Давайте действовать и исследовать вместе.
PS: Недавно были запущены новые функции, давайте вместе искать ошибки ~