краткое содержание:В этой статье представлены Dinky серия функциональных практик Flink CDC Анализ в реальном времени всего склада на складе. Содержание включает в себя:
Tips:Исторический портал~
《Dinky FlinkCDC Полное складское хранение StarRocks》
《Создайте чрезвычайно быструю единую аналитическую платформу с помощью Flink + StarRocks + Dinky.》
《Dinky Расширять iceberg Обмен практикой》
《Dinky Строить Flink CDC Полное складское хранение Войти в озеро》
Адрес GitHub
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
Приглашаем всех обратить внимание на развитие Динки~
1. Предисловие
Синхронизация всей библиотеки Dinky была выпущена уже некоторое время. Прочитав эту статью, вы познакомитесь с использованием синхронизации всей библиотеки Dinky. С этой целью сообщество Dinky подготовило серию синхронизации всей библиотеки, чтобы каждый мог быстро приступить к работе.
Поскольку в отрасли существует множество библиотек на стороне приемника, мы намеренно выбрали наиболее распространенные или популярные библиотеки в качестве демонстрационных. И выбрал mysql-cdc в качестве стороны источника, чтобы синхронизировать всю библиотеку с каждой стороной приемника. Конечно, после прочтения этой статьи, если ваш источник — oracle-cdc, просто замените его на mysql-cdc.
2. Экологические требования
программное обеспечение | Версия |
---|---|
CDH | 6.2.0 |
Hadoop | 3.0.0-cdh6.2.0 |
Hive | 2.1.1-cdh6.2.0 |
Hudi | 0.11.1 |
Flink | 1.13.6 |
Flink CDC | 2.2.1 |
StarRocks | 2.2.0 |
Dinky | 0.6.6-SNAPSHOT |
MySQL | 5.7 |
PostgreSQL | 13 |
ClickHouse | 22.2.2.1 (автономная версия) |
необходимые зависимости
Чтобы синхронизировать всю библиотеку, вам необходимо загрузить соединитель Flink, необходимый для окружающих компонентов в Flink. Зависимости следующие:
# улейполагаться Сумка
antlr-runtime-3.5.2.jar
улей-exec-2.1.1-cdh6.2.0.jar
libfb303-0.9.3.jar
flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar
улей-site.xml
# hadoopполагаться
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Flink Starrrocksполагаться
flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar
# Hudi полагаться
hudi-flink1.13-bundle_2.12-0.11.1.jar
# Dinky hadoopполагаться
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# Dinky Синхронизация всей базы данныхполагаться Сумка
dlink-client-1.13-0.6.5.jar
dlink-client-base-0.6.5.jar
dlink-common-0.6.5.jar
# flink cdcполагаться сумке
flink-sql-connector-mysql-cdc-2.2.1.jar
# mysql водить машинуполагаться
mysql-connector-java-8.0.21.jar
# kafka flinkполагаться
flink-sql-connector-kafka_2.12-1.13.6.jar
# postgresql jdbcполагаться
postgresql-42.2.14.jar
# clickhouse полагаться
clickhouse-jdbc-0.2.6.jar
flink-connector-clickhouse-1.13.6.jar
иллюстрировать
1.Hive Размещение пакета зависимостей FLINK_HOME/lib и DINKY_HOME/plugins Вниз
2.Hadoop Размещение пакета зависимостей $FLINK_HOME/lib Вниз
3.Flink Starrrocks Размещение пакета зависимостей FLINK_HOME/lib иDINKY_HOME/plugins Вниз
4.Hudi Размещение пакета зависимостей FLINK_HOME/lib и DINKY_HOME/plugins Вниз
5.Dinky hadoop Размещение пакета зависимостей $DINKY_HOME/plugins Вниз(онлайн диск или объявление группы Вниз)
6.Dinky Синхронизация всей базы данных Размещение пакета зависимостей $FLINK_HOME/lib Вниз
7.Flink CDC Размещение пакета зависимостей FLINK_HOME/lib и DINKY_HOME/plugins Вниз
8.MySQL Размещение зависимостей драйвера FLINK_HOME/lib и DINKY_HOME/plugins Вниз
9.Kafka Flink полагаться FLINK_HOME/lib и DINKY_HOME/plugins Вниз
10.PostgreSQL jdbc полагатьсяместо FLINK_HOME/lib иDINKY_HOME/plugins Вниз
11.ClickHouse полагатьсяместо FLINK_HOME/lib и DINKY_HOME/plugins Вниз
После вставки вышеуказанных ошибок перезапустите Flink Кластер и Динки. Если вы столкнулись с конфликтами пакетов jar, вы можете самостоятельно разрешить соответствующие конфликтующие пакеты согласно отчету об ошибках.
3. Подготовка исходной библиотеки.
Например, скрипт Внизsql использует Flink CDC Официальный сайт
# mysqlСоздать оператор таблицы (синхронизирован со Starocks)
CREATE TABLE bigdata.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE bigdata.orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
4. Введение в параметры синхронизации всей базы данных.
для Dinky Открытые параметры синхронизируются по всей библиотеке, в большинстве Sink Применимо к обоим направлениям. За исключением нескольких Sink Что касается целевой стороны, из-за различных методов реализации обобщения сделать невозможно. нравиться Худи. Публичные параметры на основе Dinky предоставленный синтаксис,нравиться Вниз:
key | value | Поездка в Вниз |
---|---|---|
connector | mysql-cdc | исходный конец |
hostname | имя хоста | исходный конец |
port | конецрот | исходный конец |
username | имя пользователя | исходный конец |
password | пароль | исходный конец |
checkpoint | интервал контрольной точки | исходный конец |
scan.startup.mode | Полное или инкрементное чтение | исходный конец |
parallelism | 1 | исходный конец |
database-name | Имя базы данных | исходный конец |
table-name | Имя таблицы, поддерживает регулярные выражения. | исходный конец |
sink.* | *представлять sink конец Все параметры | sink конец |
намекать: дляsink.*, при его использовании обратите внимание, что в качестве раковины необходимо писать '*' Звездочки обозначают все параметры стокконца, например родной Flink Для соединителя оператора создания таблицы Sink напишите «соединитель» в Dinky Весь синтаксис синхронизации библиотеки должен быть «sink.connector». все Sink конец должен соответствовать этой грамматической спецификации.
5. Весь водоем входит в озеро Худи.
EXECUTE CDCSOURCE demo_hudi2 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '4406',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'database-name'='bigdata',
'table-name'='bigdata\.products,bigdata\.orders',
'sink.connector'='hudi',
'sink.path'='hdfs://nameservice1/data/hudi/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='${pkList}',
'sink.hoodie.parquet.max.file.size'='268435456',
--'sink.write.precombine.field'='gmt_modified',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.skip_ro_suffix' = 'true',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.metastore.uris'='thrift://bigdata1:9083',
'sink.hive_sync.db'='qhc_hudi_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true'
)
Создавать и отправлять задания
Просмотр каталогов HDFS и таблиц Hive
Создать внешнюю таблицу StarRocks Hudi
существоватьсоздаватьвнешний Перед столом на Starrocks сначала убедитесь, что файл hdfs-site.xml развернут в каталоге conf Вниз узла FEиBE. Перезапустите узел FEиBE. Худи Таблица доступна только для чтения и может использоваться только для операций запроса. В настоящее время поддерживается Hudi Тип таблицы: Copy on write。
Создать управление Hudi ресурс
CREATE EXTERNAL RESOURCE "hudi0"
PROPERTIES (
"type" = "hudi",
"hive.metastore.uris" = "thrift://bigdata1:9083"
);
SHOW RESOURCES;
создавать Hudi внешний стол
CREATE EXTERNAL TABLE qhc_sta.orders (
`order_id` int NULL COMMENT "",
`order_date` datetime NULL COMMENT "",
`customer_name` string NULL COMMENT "",
`price` decimal(10, 5) NULL COMMENT "",
`product_id` int NULL COMMENT "",
`order_status` int NULL COMMENT ""
) ENGINE=HUDI
PROPERTIES (
"resource" = "hudi0",
"database" = "qhc_hudi_ods",
"table" = "bigdata_orders"
);
CREATE EXTERNAL TABLE qhc_sta.products (
id INT,
name STRING,
description STRING
) ENGINE=HUDI
PROPERTIES (
"resource" = "hudi0",
"database" = "qhc_hudi_ods",
"table" = "bigdata_products"
);
Проверять Hudi внешний столданные
шесть、Полное складское хранение StarRocks
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'starrocks',
'sink.jdbc-url' = 'jdbc:mysql://192.168.0.4:19035',
'sink.load-url' = '192.168.0.4:18035',
'sink.username' = 'devuser',
'sink.password' = '123456',
'sink.sink.db' = 'qhc_ods',
'sink.table.prefix' = 'ods_',
'sink.table.lower' = 'true',
'sink.database-name' = 'qhc_ods',
'sink.table-name' = '${tableName}',
'sink.sink.properties.format' = 'json',
'sink.sink.properties.strip_outer_array' = 'true',
'sink.sink.max-retries' = '10',
'sink.sink.buffer-flush.interval-ms' = '15000',
'sink.sink.parallelism' = '1'
)
создавать Операция
StarRocks Создать таблицу
CREATE TABLE qhc_ods.`ods_orders` (
`order_id` largeint(40) NOT NULL COMMENT "",
`order_date` datetime NOT NULL COMMENT "",
`customer_name` varchar(65533) NULL COMMENT "",
`price` decimal64(10, 5) NOT NULL COMMENT "",
`product_id` bigint(20) NULL COMMENT "",
`order_status` boolean NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`order_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`order_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "qhc",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
CREATE TABLE qhc_ods.`ods_products` (
`id` largeint(40) NOT NULL COMMENT "",
`name` varchar(65533) NOT NULL COMMENT "",
`description` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "qhc",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
Проверять StarRocks поверхность
ПроверятьStarrocksповерхностьсерединаданные Это пусто?
Отправьте задание на синхронизацию всей базы данных Flink
снова Проверять StarRocks
7. Импортируйте всю базу данных в MySQL.
EXECUTE CDCSOURCE cdc_mysql2 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://192.168.0.5:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5'
)
создавать Операция
создавать MySQL поверхность
drop table test.test_products;
CREATE TABLE test.test_products (
id INTEGER NOT NULL ,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id INTEGER NOT NULL ,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
);
Отправьте задание на синхронизацию всей базы данных Flink
Проверять MySQL данные
8. Синхронизация всей базы данных Kafka
# работа в CDC
EXECUTE CDCSOURCE cdc_kafka WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector'='datastream-kafka',
'sink.topic'='cdctest',
'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)
создавать Операция
создавать Kafka Topic
создавать topic Незаметно, Динки Синхронизация всей базы данных Будет автоматическисоздавать。
# создаватьtopic
./bin/kafka-topics.sh \
--create \
--zookeeper bigdata2:2181,bigdata3:2181,bigdata4:2181 \
--replication-factor 3 \
--partitions 1 \
--topic cdctest
# Проверятьtopic
./bin/kafka-topics.sh --list \
--zookeeper bigdata2:2181,bigdata3:2181,bigdata4:2181
Отправьте задание на синхронизацию всей базы данных Flink
Проверятьпотребитель
Проверятьли2открытьповерхностьданные
./bin/kafka-console-consumer.sh --bootstrap-server bigdata2:9092,bigdata3:9092,bigdata4:9092 --topic cdctest --from-beginning --group test_id
9. Вся база данных в PostgreSQL
EXECUTE CDCSOURCE cdc_postgresql5 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:postgresql://192.168.0.5:5432/test',
'sink.username' = 'test',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'org.postgresql.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5'
)
создавать Операция
создавать PostgreSQL поверхность
CREATE schema test;
drop table test.test_products;
CREATE TABLE test.test_products (
id INTEGER UNIQUE NOT NULL ,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id INTEGER UNIQUE NOT NULL ,
order_date timestamp NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NULL,
product_id INTEGER NULL,
order_status INTEGER NOT NULL -- Whether order has been placed
);
Отправьте задание на синхронизацию всей базы данных Flink
Проверять PostgreSQL данные
десять、Полное складское хранение ClickHouse
EXECUTE CDCSOURCE cdc_clickhouse WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '4406',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'clickhouse',
'sink.url' = 'clickhouse://192.168.0.5:8123',
'sink.username' = 'default',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.database-name' = 'test',
'sink.table-name' = '${tableName}',
'sink.sink.batch-size' = '500',
'sink.sink.flush-interval' = '1000',
'sink.sink.max-retries' = '3'
)
создавать Операция
создавать ClickHouse поверхность
# создать заявление для локальной поверхности
create database test;
drop table test.test_products;
CREATE TABLE test.test_products (
id Int64 NOT NULL ,
name String NOT NULL,
description String
)
ENGINE = MergeTree()
ORDER BY id
PRIMARY KEY id;
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id Int64 NOT NULL ,
order_date DATETIME NOT NULL,
customer_name String NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id Int64 NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
)
ENGINE = MergeTree()
ORDER BY order_id
PRIMARY KEY order_id;
Отправьте задание на синхронизацию всей базы данных Flink
Проверять ClickHouse данные
11. Резюме
вместе с Dinky Постоянное расширение и влияние компании в отрасли. Чтобы облегчить каждому обучение и использование, эта серия статей как Dinky Это первая статья из серии статей, ждите следующих статей серии.