Dinky создает Flink CDC и помещает всю базу данных в озеро.
Dinky создает Flink CDC и помещает всю базу данных в озеро.

краткое содержание:В этой статье объясняется, какделатьиспользовать Dinky Создание вычислительной платформы реального времени Flink CDC Весь склад помещен в склад и в озеро. Содержание включает в себя:

  1. фон
  2. Болевые точки
  3. решение
  4. Dinky разведочная практика
  5. CDCSOURCE принцип
  6. CDCSOURCE использование
  7. CDCSOURCE упражняться
  8. Подвести итог

Tips:Исторический портал~

Dinky Расширять Phoenix Совместное использование соединителя

Dinky 0.6.1 Опубликовано, оптимизировано Flink Опыт применения

DinkyсуществоватьKubernetesизупражнятьсяделиться

Dlink существовать FinkCDC Поток в озеро Hudi изупражнятьсяделиться

Адрес GitHub

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

Приглашаем всех обратить внимание на развитие Динки~

1. Предыстория

У Чонг (Юньси),Сюй Банцзян (истощенный снег) учителясуществовать Flink Forward Asia 2021 Поделился замечательным "Flink CDC Как упростить хранение и хранение данных в режиме реального времени», что приводит к созданию нового хранилища данных и архитектуры хранилища. Глава 4 Flink CDC Усовершенствования в сфере упражнения Алибабы привносят передовое мышление с упражнением, чье CDAS、CTAS Возможности синтаксиса синхронизации данных впечатляют. В последнее время цель состоит в том, чтобы стать FlinkSQL лучший партнер Dinky Также принес FlinkCDC Давайте попробуем улучшить практику складирования всего склада в озере~

2. Болевые точки

Болевые точки озерного хранилища данных Flink CDC суммированы в статье «Как Flink CDC упрощает хранение данных в озере в реальном времени» в виде следующих четырех пунктов:

1. Проблема полного инкрементального переключения

Архитектура CDC-in-the-lake использует преимущества собственных возможностей обновления Hudi и может обеспечить полное инкрементное переключение, указав точную инкрементальную начальную точку посредством ручного вмешательства, но существует риск потери данных.

2. Структура таблицы сопоставления вручную подвержена ошибкам.

При построении задач синхронизации через FlinkCDC необходимо вручную сопоставлять структуры таблиц, такие как Mysql, с Flink DDL. Когда количество таблиц и полей очень велико, стоимость разработки и обслуживания будет увеличиваться линейно. А сопоставление типов полей вручную может привести к ошибкам.

3.Schema Изменения затрудняют поддержание связи с озером.

Изменения структуры таблицы происходят часто, но это будет сохраняться. FlinkCDC Задача теряет данные и даже приводит к зависанию связи с озером.

4. Опустить весь склад в озеро.

Весь водоем, входящий в озеро, в настоящее время является горячей темой. FlinkCDC В этом процессе будет много проблем, таких как необходимость определения большого количества DDL и написать большое количество INSERT што, что более серьезно, так это то, что он будет занимать большое количество подключений к базе данных. Mysql и сетевой стресс.

3. Решение

Alibaba создала решение «полностью автоматизированной интеграции данных» на основе Flink:

  • Flink CDC уже обладает возможностями полного поэтапного автоматического переключения.
  • проходить Flink Catalog Приходите в автообнаружение Mysql Выражение schema,проходить Hudi Catalog автоматическийсуществовать Hudi Создайте метаинформацию целевой таблицы.
  • проходить Schema Evolution делать FlinkCDC Поддержка в режиме реального времени же Шаг schema изменять.
  • проходить CDAS синтаксис, одна строка SQL Библиотека завершения операторов такая же Шаг Операцияизопределение,Объединить источники.

4. Исследования и практика Динки

1. Поддержка последней версии Flink CDC.

Dinky полностью поддерживает последнюю версию Flink CDC. Flink CDC обновлен до 2.2.1. Начиная с версии 2.+, функции Flink CDC стали более стабильными и улучшенными. Подробности см.

https://github.com/ververica/flink-cdc-connectors

Среди них последняя версия Flink CDC уже имеет функции полного инкрементального автоматического переключения и синхронизации изменений схемы.

2. Определите синтаксис синхронизации всей базы данных CDCSOURCE.

Dinky определяет синтаксис синхронизации CDCSOURCE для всей базы данных. Этот синтаксис аналогичен CDAS. Он может напрямую и автоматически создавать задачу хранения всей базы данных в озере в реальном времени и объединять источники без создания дополнительной нагрузки на MySQL и сети. Синхронизация любых стоков, таких как kafka, doris, hudi, jdbc и т.д.

5. Принцип CDCSOURCE

1. слияние исходников

Если установлено слишком много подключений к базе данных, повторное чтение Binlog приведет к огромной нагрузке на исходную базу данных. В приведенном выше совместном использовании используется оптимизация слияния источников. Попробуйте объединить источники в одном задании. тот же источник данных, он будет объединен с исходным узлом.

Dinky использует только один источник, а затем выполняет обработку разгрузки на основе схемы, базы данных и таблицы и помещает их в соответствующие таблицы соответственно. На рисунке ниже показан весь водоем, входящий в озеро Худи (две таблицы).

2. Сопоставление метаданных

В приведенном выше совместном использовании MysqlCatalog используется для получения таблиц и схемы исходной базы данных для сопоставления Flink DDL.

Dinky собирает информацию метаданных исходной библиотеки с помощью функции метаданных своего собственного центра источников данных и синхронизирует FlinkDDL, используемый для построения потока данных или tableAPI на этапе приемника.

3. Несколько методов приемника

Вышеупомянутое совместное использование относится к худи.

Dinky Предоставляет разнообразные sink образом, изменяя параметры оператора, разные sink Способ. Динки поддержка через DataStream Приходите в Расширять новые раковина, вы также можете использовать FlinkSQL никто Вам необходимо напрямую изменить код Расширятьnew sink。

6. Использование CDCSOURCE

1.Основное использование

Язык кода:javascript
копировать
EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'dlink',
  'password' = 'dlink',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  -- 'database-name'='test',
  'table-name' = 'test\.student,test\.score',
  -- 'sink.connector'='datastream-doris',
  'sink.connector' = 'doris',
  'sink.fenodes' = '127.0.0.1:8030',
  'sink.username' = 'root',
  'sink.password' = 'dw123456',
  'sink.sink.batch.size' = '1',
  'sink.sink.max-retries' = '1',
  'sink.sink.batch.interval' = '60000',
  'sink.sink.db' = 'test',
  'sink.table.prefix' = 'ODS_',
  'sink.table.upper' = 'true',
  'sink.table.identifier' = '${schemaName}.${tableName}',
  'sink.sink.enable-delete' = 'true'
)

Что следует отметить:

  • Он написан в соответствии с форматом примера, и задача FlinkSQL может записать только один CDCSOURCE.
  • Перед английской запятой в Элементах конфигурации не может быть пробела, после него должна быть поставлена ​​правая одинарная кавычка.
  • Отключите глобальные переменные, наборы операторов и пакетный режим.
  • Режим приложения в настоящее время не поддерживается и будет поддерживаться в будущем.
  • В настоящее время исходный код реализует только MysqlCDC и Oracle CDC, другое CDC Расширять просто.

2. Элементы конфигурации

Элементы конфигурации

Это необходимо

значение по умолчанию

иллюстрировать

connector

да

никто

То же, что Flink CDC

hostname

да

никто

То же, что Flink CDC

port

да

никто

То же, что Flink CDC

username

да

никто

То же, что Flink CDC

password

да

никто

То же, что Flink CDC

scan.startup.mode

нет

latest-offset

То же, что Flink CDC

database-name

нет

никто

Поддержка регулярная

schema-name

нет

никто

Поддержка регулярная

table-name

нет

никто

Поддержка регулярная, если при выполнении используется форма имя библиотеки.имя таблицы, необходимо использовать \.

source.*

нет

никто

Укажите персонализированную конфигурацию CDC, например source.server-time-zone, который является параметром конфигурации server-time-zone.

checkpoint

нет

никто

Единица мс

parallelism

нет

никто

Параллелизм задач

sink.connector

да

никто

Укажите тип приемника, например datastream-kafka, datastream-doris, datastream-hudi, kafka, doris, hudi, jdbc и т. д. Тот, который начинается с datastream — это реализация DataStream.

sink.sink.db

нет

никто

Имя библиотеки целевого источника данных. Если не указано, по умолчанию будет использоваться имя библиотеки исходного источника данных.

sink.table.prefix

нет

никто

Префикс имени целевой таблицы, например ODS_, что означает, что ODS_ вставляется перед всеми именами таблиц.

sink.table.suffix

нет

никто

Суффикс имени целевой таблицы.

sink.table.upper

нет

никто

Имя целевой таблицы должно состоять из заглавных букв.

sink.table.lower

нет

никто

Имя целевой таблицы записывается строчными буквами.

sink.*

нет

никто

целевой источник данныхиз Информация о конфигурации,такой же FlinkSQL с использованием ${schemaName} и ${tableName} Имена обработанных исходных таблиц могут быть внедрены

7. Практика CDCSOURCE

1. Объедините данные реального времени в тему Kafka.

Язык кода:javascript
копировать
EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'dlink',
  'password' = 'dlink',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'test\.student,test\.score',
  'sink.connector'='datastream-kafka',
  'sink.topic'='dlinkcdc',
  'sink.brokers'='127.0.0.1:9092'
)

2. Данные в режиме реального времени же Шаг к переписке kafka topic

Язык кода:javascript
копировать
EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'dlink',
  'password' = 'dlink',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'test\.student,test\.score',
  'sink.connector'='datastream-kafka',
  'sink.brokers'='127.0.0.1:9092'
)

3. Хранилище данных DataStream в режиме реального времени Doris

Язык кода:javascript
копировать
EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'dlink',
  'password' = 'dlink',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'test\.student,test\.score',
  'sink.connector' = 'datastream-doris',
  'sink.fenodes' = '127.0.0.1:8030',
  'sink.username' = 'root',
  'sink.password' = 'dw123456',
  'sink.sink.batch.size' = '1',
  'sink.sink.max-retries' = '1',
  'sink.sink.batch.interval' = '60000',
  'sink.sink.db' = 'test',
  'sink.table.prefix' = 'ODS_',
  'sink.table.upper' = 'true',
  'sink.sink.enable-delete' = 'true'
)

4. Хранилище данных в реальном времени в FlinkSQL Doris

Язык кода:javascript
копировать
EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'dlink',
  'password' = 'dlink',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'test\.student,test\.score',
  'sink.connector' = 'doris',
  'sink.fenodes' = '127.0.0.1:8030',
  'sink.username' = 'root',
  'sink.password' = 'dw123456',
  'sink.sink.batch.size' = '1',
  'sink.sink.max-retries' = '1',
  'sink.sink.batch.interval' = '60000',
  'sink.sink.db' = 'test',
  'sink.table.prefix' = 'ODS_',
  'sink.table.upper' = 'true',
  'sink.table.identifier' = '${schemaName}.${tableName}',
  'sink.sink.enable-delete' = 'true'
)

5. Данные в реальном времени в озеро Худи.

Язык кода:javascript
копировать
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'source.server-time-zone' = 'UTC',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='data_deal',
'table-name'='data_deal\.stu,data_deal\.stu_copy1',
'sink.connector'='hudi',
'sink.path'='hdfs://cluster1/tmp/flink/cdcdata/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='id',
'sink.hoodie.parquet.max.file.size'='268435456',
'sink.write.precombine.field'='update_time',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.db'='cdc_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true',
'sink.hive_sync.metastore.uris'='thrift://cdh.com:9083',
'sink.hive_sync.username'='flinkcdc'
)

8. Резюме

Dinky существовать Flink CDC Практика хранения всей базы данных в озере решает три упомянутые выше проблемы: проблему полного инкрементного переключения, подверженную ошибкам структуру таблицы сопоставления вручную и хранение всей базы данных в озере. Schema Изменения затрудняют поддержание связи с озером.Не решено,Приветствуем дальнейшее обсуждение。

также Dinky Он также поддерживает всю библиотеку же Шагразличные источники данныхиз Раковина позволяет пользователям выполнять различные задачи по входу в озеро и складированию.

В этой статье не обсуждаются детали реализации исходного кода. Принцип реализации теоретически может быть внедрен. FlinkSQL Процесс обработки, его выполнение может существовать Войти на Обработка данных выполняется, когда склад входит в озеро, добро пожаловать на исследование.

Наконец мы можем найти Dinky По сравнению с другими проектами с открытым исходным кодом, он больше фокусируется на Flink из Опыт Применение улучшено, а также основано на принципе его проектирования, который позволяет легче расширять различные функции уровня предприятия, такие как пользовательская грамматика, вход в озеро Войти на склад、Сохранение каталога、Родствоиспользоватьждать。так,Столкнулся с таким замечательным проектом с открытым исходным кодом,Чего вы ждете?,Давайте действовать и исследовать вместе.

PS: Недавно были запущены новые функции, давайте вместе искать ошибки ~

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose