Flink CDC В 2021 Год 11 луна 15 Последняя версия была выпущена на 2.1, который был встроен путем введения Debezium компонент, добавленный в Oracle поддерживать. Этот план в основном ориентирован на flink-connector-oracle-cdc
Пройти пробную версию。первыйсуществоватьместная пара Oracle CDC
После прохождения отладки объедините ее с потоковыми вычислениями продукта Tencent Cloud. Oceanus, ЭМР (Куду) реализует Oracle-Oceanus-Kudu
Комплексное решение, не требующее сложной реализации бизнес-логики (здесь осуществляется простейшая передача данных, и пользователи могут писать соответствующие коды, исходя из реальных условий бизнеса), а некоторые найденные проблемы обобщаются и делятся с читателями.
Среда базы данных Oracle здесь устанавливается на CVM в кластере EMR через Docker. Записывая и обновляя данные в базе данных Oracle вручную, Oceanus фиксирует измененные данные в реальном времени и сохраняет их в компоненте Kudu EMR. На основе приведенного выше плана была спроектирована следующая архитектурная схема:
Частная сеть (VPC) — это логически изолированное сетевое пространство, настроенное в Tencent Cloud. Oceanus Кластер, Редис Рекомендуется выбирать одну и ту же сеть при выборе компонентов и других служб. VPC, сети могут взаимодействовать друг с другом. В противном случае вам необходимо использовать одноранговое соединение, NAT. Шлюз, VPN и т. д., чтобы открыть сеть. Пожалуйста, обратитесь к инструкциям по созданию частной сети. Справочная документация。
Потоковые вычисления Oceanus Это экосистема продуктов больших данных, инструмент анализа в реальном времени и базовый инструмент. Apache Flink Создайте платформу анализа больших данных корпоративного уровня в режиме реального времени с характеристиками комплексной разработки, бесперебойного подключения, задержки менее секунды, низкой стоимости, безопасности и стабильности. Потоковые вычисления Oceanus С целью максимизации ценности корпоративных данных мы ускоряем процесс цифровизации предприятий в режиме реального времени.
существовать Oceanus консольиз【кластеруправлять】->【Новыйкластер】страницасоздаватькластер,Выберите регион, зону доступности, VPC, журналы и хранилище.,Установите первоначальный пароль и т. д. ВКК и подсеть используют только что созданную сеть. После создания Oceanus Кластеры следующие:
EMR — это эластичная служба Hadoop с открытым исходным кодом, размещенная в облаке и поддерживающая платформы больших данных, такие как Kudu, HDFS, Presto, Flink и Druid. В этом примере в основном требуется использование Kudu, Zookeeper, HDFS, Yarn, Impala и Knox. компоненты.
Входить консоль ЭМИ,Нажмите на левый верхний угол [создаватькластер], чтобы продолжить кластеризсоздавать.,создавать Обратите внимание на свой выбор во время процесса.【продукт Версия】,другойиз Версия Включатьизкомпонентыдругой,Автор выбирает здесьEMR-V3.2.1
Версия,Кроме того,【кластер】необходимо выбрать, прежде чем создавать VPC и соответствующую подсеть. Пожалуйста, обратитесь к конкретному процессу Создайте кластер EMR.。
скачать Docker:
другая среда CVM может быть другой,Здесь автор использует автономный режим установки.,Адрес официального сайта инсталляционного пакета。скачать Конфигурация После завершения нажмите следующую командузапускатьибегать Docker Служить.
# запускать
systemctl start docker
# Настроить запуск загрузки
systemctl enable docker.service
# Проверять docker Статус услуги
systemctl status docker
скачать Oracle Зеркало:
# Находить Oracle версия с зеркалом
docker search oracle
# скачать соответствует зеркалу, здесь у нас есть скачать truevoly/oracle-12c Версия
docker pull truevoly/oracle-12c
# бегать Docker контейнер
docker run -d -p 1521:1521 --name oracle12c truevoly/oracle-12c
# Входитьконтейнер
docker exec -it oracle12c /bin/bash
Включить архивирование журналов:
-- При необходимости повторите source один раз .profile документ
source /home/oracle/.profile
-- 1. переключиться на Oracle пользователь
su oracle
-- 2. к DBA База подключения удостоверений данных
$ORACLE_HOME/bin/sqlplus /nolog
conn /as sysdba
show user
-- 3. Включить архивирование журналов
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope = spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
-- 4. Проверьте, заархивирован ли журнал
archive log list;
Уведомление: 1.
/opt/oracle/oradata/recovery_area
Путь необходимо использоватьroot
пользователь Создайте заранее,и Предоставить разрешения на чтение и запись:chmod 777 /opt/oracle/oradata/recovery_area
。 2. Для включения архивирования журналов требуется перезапуск базы данных. 3. Архивированные журналы занимают много места на диске, а журналы с истекшим сроком действия необходимо регулярно очищать.
создать табличное пространство:
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
Уведомление:
/opt/oracle/oradata/SID
Путь необходимо использоватьroot
пользовательзаранеесоздавать,и Предоставить разрешения на чтение и запись:chmod 777 /opt/oracle/oradata/SID
。
создаватьпользовательи Авторизовать:
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
Подготовка данных:
-- создавать Oracle стол, используемый для Source конец
CREATE TABLE FLINKUSER.TEST1(
ID NUMBER(10,0) NOT NULL ENABLE,
NAME VARCHAR2(50),
PRIMARY KEY(ID)
) TABLESPACE LOGMINER_TBS;
-- Вручную вставьте несколько фрагментов данных
INSERT INTO FLINKUSER.TEST1 (ID,NAME) VALUES (1,'1111');
Начать дополнительную регистрацию:
-- вернобаза данных Конфигурация
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- Выполните Конфигурацию на столе
ALTER TABLE FLINKUSER.TEST1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Этот план подходит для недавно запущенных flink-connector-oracle-cdc
функцию, чтобы попробовать. Автор здесь впервые использует его на локальной машине Docker Настройка установки Oracle 11g и Oracle 12c Две версии для локального Oracle Тест чтения таблицы, выполнение чтения данных toRetractStream
Вывод на печать после преобразования,вернокоторый нашелиз После обобщения некоторых вопросови Поделитесь со всемиодин раз。впоследствиисуществовать EMR Выберите один в кластере CVM Конфигурация Oracle 12c среду, перенесите код в Oceanus платформе, и окончательные данные попадают в Kudu да, осознай Oracle To Kudu полный набор решений.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.2-SNAPSHOT</version>
<!-- Здесь необходимо установить опору на объем, другое flink опора должна быть установлена на provied,Oceanus Платформа предоставила -->
<scope>compile</scope>
</dependency>
package com.demo;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OracleToKudu {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,settings);
// SQL Метод письма
tEnv.executeSql("CREATE TABLE `oracleSource` (\n" +
" ID BIGINT,\n" +
" NAME VARCHAR,\n" +
" PRIMARY KEY(ID) NOT ENFORCED )\n" +
" WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
// Пожалуйста, измените его на Oracle Местосуществоватьиздействительный IP адрес
" 'hostname' = 'xx.xx.xx.xx',\n" +
" 'port' = '1521',\n" +
" 'username' = 'flinkuser',\n" +
" 'password' = 'flinkpw',\n" +
" 'database-name' = 'xe',\n" +
" 'schema-name' = 'flinkuser',\n" +
" 'table-name' = 'test1'\n" +
")");
// Stream API Метод письма
// SourceFunction<String> sourceFunction = OracleSource.<String>builder()
// .hostname("xx.xx.xx.xx")
// .port(1521)
// .database("xe")
// .schemaList("flinkuser")
// .tableList("flinkuser.test1")
// .username("flinkuser")
// .password("flinkpw")
// .deserializer(new JsonDebeziumDeserializationSchema())
// .build();
// sEnv.addSource(sourceFunction)
tEnv.executeSql("CREATE TABLE `kudu_sink_table` (\n" +
" `id` BIGINT,\n" +
" `name` VARCHAR\n" +
") WITH (\n" +
" 'connector.type' = 'kudu',\n" +
// Пожалуйста, измените его наактуализ master IP адрес
" 'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051',\n" +
" 'kudu.table' = 'JoylyuTest1',\n" +
" 'kudu.hash-columns' = 'id',\n" +
" 'kudu.primary-key-columns' = 'id'\n" +
")");
// Здесь автор выполняет только самую упрощенную функцию передачи данных. Пожалуйста, разработайте ее в соответствии с реальной деловой ситуацией.
tEnv.executeSql("insert into kudu_sink_table select * from oracleSource");
}
}
существовать Oceanus В консоли нажмите [Управление зависимостями] слева, нажмите [Создать] в верхнем левом углу, чтобы создать новую зависимость, и загрузите ее локально. Jar Сумка.
существовать Oceanus В консоли нажмите [Управление заданиями] слева, нажмите [Создать] в левом верхнем углу, чтобы создать новое задание, и выберите тип задания. Jar Операция,Нажмите【Разработка и отладка】Входить Операцияредактироватьстраница。
[Основной пакет] Выберите только что загруженную зависимость и выберите последнюю версию. Заполните [Основной класс]. com.demos.OracleToKudu
。
Нажмите [Параметры работы существуют], [встроенный Выберите соединитель flink-connector-kudu
,Нажмите [Сохранить].
Нажмите [Опубликовать черновик] для запуска, и вы сможете просмотреть информацию о запуске через панель задач [Журнал] или пользовательский интерфейс Flink.
существовать EMR Выберите один в кластере CVM Входить,Запрос Написать kudu данные.
# Входить kudu в каталоге
cd /usr/local/service/kudu/bin
# Проверятькластер Местоиметьповерхность
./kudu table list master-01,master-02,master-03
# Запрос JoylyuTest1 Табличные данные
./kudu table scan master-01,master-02,master-03 JoylyuTest1
Конечно, Куду Также можно использовать с Impala Интегрируйте через Impala Запросданные,Но нужносуществовать Impala Вышеизложенное установлено с Kudu Можно запросить только внешнюю таблицу, соответствующую этой таблице. Подробную информацию см. Краткое описание раковины Oceanus Kudu。
CREATE EXTERNAL TABLE ImpalaJoylyuTest1
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'master-01:7051,master-02:7051,master-03:7051',
'kudu.table_name' = 'JoylyuTest1'
);
Автор здесьсуществоватьместная пара Два видадругойиз Oracle Версия: Оракул 11g и Oracle 12c Debug В ходе процесса были обнаружены некоторые проблемы, которые кратко описаны здесь.
Первая: проблема с регистром имени таблицы.
Здесь автор впервые обсуждает Oracle 11g тест,существовать Конфигурация Заполните, как указано выше.изместный после шаговбегать,Сразу после распечатки данных появляется следующее сообщение об ошибке:
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table HELOWIN.FLINKUSER.test1. Use command: ALTER TABLE FLINKUSER.test1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
at io.debezium.connector.oracle.logminer.LogMinerHelper.checkSupplementalLogging(LogMinerHelper.java:407)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:132)
... 7 more
Из информации журнала можно узнать, что авторская таблица TEST1
Автоматически преобразуется в нижний регистризповерхность test1
。Появляется сообщение об ошибкесуществовать checkSupplementalLogging
,ВЭто основано на этом сообщении об ошибке Проверять Исходный код находится здесьверно Oracle внутри ALL_LOG_GROUPS
поверхностьсделай это один раз Запрос,Запрос данных Невыход приведет к ошибке.(ALL_LOG_GROUPS
поверхностьхранится визкапитализируетсяизповерхностьимяTEST1
)。
static String tableSupplementalLoggingCheckQuery(TableId tableId) {
return String.format("SELECT 'KEY', LOG_GROUP_TYPE FROM %s WHERE OWNER = '%s' AND TABLE_NAME = '%s'", ALL_LOG_GROUPS, tableId.schema(), tableId.table());
}
// Преобразуйте имя таблицы в нижний регистр. Исходный код выглядит следующим образом.
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
На данный момент мы можем обойти эту проблему тремя способами:
toLowercase
Изменить на toUppercase
。WITH
Добавить параметры 'debezium.database.tablename.case.insensitive'='false'
Конфигурация,Пусть он потеряет функцию «нечувствительности к регистру».,существоватьtable-name
Требуется капитализацияповерхностьимя。Второе: проблема с задержкой обновления данных.
Авторское руководство по направлению существуют Oracle база данных Писатьданные,проходитьсуществовать IDEA Когда консоль распечатывает данные, обнаруживается, что при добавлении данных (Append) данные будут примерно 15s задержка При записи обновления (Upsert) задержка будет больше. Иногда это необходимо. Регистрация изменений данных занимает 3–5 минут. Для решения этой проблемы Флинк CDC FAQ Ясное решение дано в существованиисоздавать Oracle Source Table час,существовать WITH
Добавить Параметры следующие:
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
Третье: проблема настройки параллелизма
Автор здесь существующего пытается обеспечить параллелизм 2 При чтении данных я обнаружил следующую ошибку:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)
at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:114)
at com.demo.OracleToKudu.main(OracleToKudu.java:67)
После обнаружения информации о стеке Oracle CDC Степень параллелизма можно установить только 1, с Oracle CDC Официальная документация последовательный.
// Часть кода ошибки
public static void validateParallelism(int parallelism, boolean canBeParallel) {
Preconditions.checkArgument(canBeParallel || parallelism == 1, "The parallelism of non parallel operator must be 1.");
Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism of an operator must be at least 1, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}