Хранилище данных в реальном времени: обновление данных Oracle в Kudu в реальном времени на основе Flink CDC.
Хранилище данных в реальном времени: обновление данных Oracle в Kudu в реальном времени на основе Flink CDC.

Описание решения

Обзор

Flink CDC В 2021 Год 11 луна 15 Последняя версия была выпущена на 2.1, который был встроен путем введения Debezium компонент, добавленный в Oracle поддерживать. Этот план в основном ориентирован на flink-connector-oracle-cdcПройти пробную версию。первыйсуществоватьместная пара Oracle CDC После прохождения отладки объедините ее с потоковыми вычислениями продукта Tencent Cloud. Oceanus, ЭМР (Куду) реализует Oracle-Oceanus-Kudu Комплексное решение, не требующее сложной реализации бизнес-логики (здесь осуществляется простейшая передача данных, и пользователи могут писать соответствующие коды, исходя из реальных условий бизнеса), а некоторые найденные проблемы обобщаются и делятся с читателями.

Архитектура решения

Среда базы данных Oracle здесь устанавливается на CVM в кластере EMR через Docker. Записывая и обновляя данные в базе данных Oracle вручную, Oceanus фиксирует измененные данные в реальном времени и сохраняет их в компоненте Kudu EMR. На основе приведенного выше плана была спроектирована следующая архитектурная схема:

Архитектура решениякартина.png
Архитектура решениякартина.png

Подготовка

Создайте частную сеть VPC

Частная сеть (VPC) — это логически изолированное сетевое пространство, настроенное в Tencent Cloud. Oceanus Кластер, Редис Рекомендуется выбирать одну и ту же сеть при выборе компонентов и других служб. VPC, сети могут взаимодействовать друг с другом. В противном случае вам необходимо использовать одноранговое соединение, NAT. Шлюз, VPN и т. д., чтобы открыть сеть. Пожалуйста, обратитесь к инструкциям по созданию частной сети. Справочная документация

Создание кластера Oceanus с потоковыми вычислениями

Потоковые вычисления Oceanus Это экосистема продуктов больших данных, инструмент анализа в реальном времени и базовый инструмент. Apache Flink Создайте платформу анализа больших данных корпоративного уровня в режиме реального времени с характеристиками комплексной разработки, бесперебойного подключения, задержки менее секунды, низкой стоимости, безопасности и стабильности. Потоковые вычисления Oceanus С целью максимизации ценности корпоративных данных мы ускоряем процесс цифровизации предприятий в режиме реального времени.

существовать Oceanus консольиз【кластеруправлять】->【Новыйкластер】страницасоздаватькластер,Выберите регион, зону доступности, VPC, журналы и хранилище.,Установите первоначальный пароль и т. д. ВКК и подсеть используют только что созданную сеть. После создания Oceanus Кластеры следующие:

Oceanusкластер.png
Oceanusкластер.png

Создайте кластер EMR.

EMR — это эластичная служба Hadoop с открытым исходным кодом, размещенная в облаке и поддерживающая платформы больших данных, такие как Kudu, HDFS, Presto, Flink и Druid. В этом примере в основном требуется использование Kudu, Zookeeper, HDFS, Yarn, Impala и Knox. компоненты.

Входить консоль ЭМИ,Нажмите на левый верхний угол [создаватькластер], чтобы продолжить кластеризсоздавать.,создавать Обратите внимание на свой выбор во время процесса.【продукт Версия】,другойиз Версия Включатьизкомпонентыдругой,Автор выбирает здесьEMR-V3.2.1Версия,Кроме того,【кластер】необходимо выбрать, прежде чем создавать VPC и соответствующую подсеть. Пожалуйста, обратитесь к конкретному процессу Создайте кластер EMR.

создаватьEMRкластер.png
создаватьEMRкластер.png

Настройка среды Oracle

1. Установите образ Oracle

скачать Docker:

другая среда CVM может быть другой,Здесь автор использует автономный режим установки.,Адрес официального сайта инсталляционного пакета。скачать Конфигурация После завершения нажмите следующую командузапускатьибегать Docker Служить.

Язык кода:shell
копировать
# запускать
systemctl start docker

# Настроить запуск загрузки
systemctl enable docker.service

# Проверять docker Статус услуги
systemctl status docker

скачать Oracle Зеркало:

Язык кода:shell
копировать
# Находить Oracle версия с зеркалом
docker search oracle

# скачать соответствует зеркалу, здесь у нас есть скачать truevoly/oracle-12c Версия
docker pull truevoly/oracle-12c

# бегать Docker контейнер
docker run -d -p 1521:1521 --name oracle12c truevoly/oracle-12c

# Входитьконтейнер
docker exec -it oracle12c /bin/bash
2. Настройте базу данных Oracle

Включить архивирование журналов:

Язык кода:sql
копировать
-- При необходимости повторите source один раз .profile документ
source /home/oracle/.profile

-- 1. переключиться на Oracle пользователь
su oracle

-- 2. к DBA База подключения удостоверений данных
$ORACLE_HOME/bin/sqlplus /nolog
conn /as sysdba
show user

-- 3. Включить архивирование журналов
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope = spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

-- 4. Проверьте, заархивирован ли журнал
archive log list;

Уведомление: 1. /opt/oracle/oradata/recovery_areaПуть необходимо использоватьrootпользователь Создайте заранее,и Предоставить разрешения на чтение и запись:chmod 777 /opt/oracle/oradata/recovery_area。 2. Для включения архивирования журналов требуется перезапуск базы данных. 3. Архивированные журналы занимают много места на диске, а журналы с истекшим сроком действия необходимо регулярно очищать.

создать табличное пространство:

Язык кода:sql
копировать
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Уведомление:/opt/oracle/oradata/SIDПуть необходимо использоватьrootпользовательзаранеесоздавать,и Предоставить разрешения на чтение и запись:chmod 777 /opt/oracle/oradata/SID

создаватьпользовательи Авторизовать:

Язык кода:sql
копировать
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

Подготовка данных:

Язык кода:sql
копировать
-- создавать Oracle стол, используемый для Source конец
CREATE TABLE FLINKUSER.TEST1(
    ID    NUMBER(10,0) NOT NULL ENABLE,   
    NAME  VARCHAR2(50),
    PRIMARY KEY(ID)
   ) TABLESPACE LOGMINER_TBS;
-- Вручную вставьте несколько фрагментов данных
INSERT INTO FLINKUSER.TEST1 (ID,NAME) VALUES (1,'1111');

Начать дополнительную регистрацию:

Язык кода:sql
копировать
-- вернобаза данных Конфигурация
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- Выполните Конфигурацию на столе
ALTER TABLE FLINKUSER.TEST1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Реализация решения

Этот план подходит для недавно запущенных flink-connector-oracle-cdc функцию, чтобы попробовать. Автор здесь впервые использует его на локальной машине Docker Настройка установки Oracle 11g и Oracle 12c Две версии для локального Oracle Тест чтения таблицы, выполнение чтения данных toRetractStream Вывод на печать после преобразования,вернокоторый нашелиз После обобщения некоторых вопросови Поделитесь со всемиодин раз。впоследствиисуществовать EMR Выберите один в кластере CVM Конфигурация Oracle 12c среду, перенесите код в Oceanus платформе, и окончательные данные попадают в Kudu да, осознай Oracle To Kudu полный набор решений.

разработка локального кода

1. Зависимости Maven
Язык кода:html
копировать
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-oracle-cdc</artifactId>
    <version>2.2-SNAPSHOT</version>
    <!-- Здесь необходимо установить опору на объем, другое flink опора должна быть установлена ​​на provied,Oceanus Платформа предоставила -->
    <scope>compile</scope>
</dependency>
2. Написание кода
Язык кода:java
копировать
package com.demo;

import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OracleToKudu {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,settings);

        // SQL Метод письма
        tEnv.executeSql("CREATE TABLE `oracleSource` (\n" +
                " ID      BIGINT,\n" +
                " NAME    VARCHAR,\n" +
                " PRIMARY KEY(ID) NOT ENFORCED )\n" +
                " WITH (\n" +
                "  'connector' = 'oracle-cdc',\n" +
                // Пожалуйста, измените его на Oracle Местосуществоватьиздействительный IP адрес
                "  'hostname' = 'xx.xx.xx.xx',\n" +
                "  'port' = '1521',\n" +
                "  'username' = 'flinkuser',\n" +
                "  'password' = 'flinkpw',\n" +
                "  'database-name' = 'xe',\n" +
                "  'schema-name' = 'flinkuser',\n" +
                "  'table-name' = 'test1'\n" +
                ")");

        // Stream API Метод письма
        // SourceFunction<String> sourceFunction = OracleSource.<String>builder()
        //         .hostname("xx.xx.xx.xx")
        //         .port(1521)
        //         .database("xe")
        //         .schemaList("flinkuser")
        //         .tableList("flinkuser.test1")
        //         .username("flinkuser")
        //         .password("flinkpw")
        //         .deserializer(new JsonDebeziumDeserializationSchema())
        //         .build();
        // sEnv.addSource(sourceFunction)


        tEnv.executeSql("CREATE TABLE `kudu_sink_table` (\n" +
                " `id`    BIGINT,\n" +
                " `name`  VARCHAR\n" +
                ") WITH (\n" +
                " 'connector.type' = 'kudu',\n" +
                // Пожалуйста, измените его наактуализ master IP адрес
                " 'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051',\n" +
                " 'kudu.table' = 'JoylyuTest1',\n" +
                " 'kudu.hash-columns' = 'id',\n" +
                " 'kudu.primary-key-columns' = 'id'\n" +
                ")");

        // Здесь автор выполняет только самую упрощенную функцию передачи данных. Пожалуйста, разработайте ее в соответствии с реальной деловой ситуацией.
        tEnv.executeSql("insert into kudu_sink_table select * from oracleSource");

    }
}

Потоковые вычисления, задание JAR Oceanus

1. Загрузите зависимости

существовать Oceanus В консоли нажмите [Управление зависимостями] слева, нажмите [Создать] в верхнем левом углу, чтобы создать новую зависимость, и загрузите ее локально. Jar Сумка.

2. Создать работу

существовать Oceanus В консоли нажмите [Управление заданиями] слева, нажмите [Создать] в левом верхнем углу, чтобы создать новое задание, и выберите тип задания. Jar Операция,Нажмите【Разработка и отладка】Входить Операцияредактироватьстраница。

[Основной пакет] Выберите только что загруженную зависимость и выберите последнюю версию. Заполните [Основной класс]. com.demos.OracleToKudu

Нажмите [Параметры работы существуют], [встроенный Выберите соединитель flink-connector-kudu,Нажмите [Сохранить].

3. Запустите задание

Нажмите [Опубликовать черновик] для запуска, и вы сможете просмотреть информацию о запуске через панель задач [Журнал] или пользовательский интерфейс Flink.

Запрос данных

существовать EMR Выберите один в кластере CVM Входить,Запрос Написать kudu данные.

Язык кода:shell
копировать
# Входить kudu в каталоге
cd /usr/local/service/kudu/bin

# Проверятькластер Местоиметьповерхность
./kudu table list master-01,master-02,master-03

# Запрос JoylyuTest1 Табличные данные
./kudu table scan master-01,master-02,master-03 JoylyuTest1

Конечно, Куду Также можно использовать с Impala Интегрируйте через Impala Запросданные,Но нужносуществовать Impala Вышеизложенное установлено с Kudu Можно запросить только внешнюю таблицу, соответствующую этой таблице. Подробную информацию см. Краткое описание раковины Oceanus Kudu

Язык кода:sql
копировать
CREATE EXTERNAL TABLE ImpalaJoylyuTest1
STORED AS KUDU
TBLPROPERTIES (
  'kudu.master_addresses' = 'master-01:7051,master-02:7051,master-03:7051', 
  'kudu.table_name' = 'JoylyuTest1'
);

Сортировка проблем

Автор здесьсуществоватьместная пара Два видадругойиз Oracle Версия: Оракул 11g и Oracle 12c Debug В ходе процесса были обнаружены некоторые проблемы, которые кратко описаны здесь.

Первая: проблема с регистром имени таблицы.

Здесь автор впервые обсуждает Oracle 11g тест,существовать Конфигурация Заполните, как указано выше.изместный после шаговбегать,Сразу после распечатки данных появляется следующее сообщение об ошибке:

Язык кода:text
копировать
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table HELOWIN.FLINKUSER.test1.  Use command: ALTER TABLE FLINKUSER.test1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
	at io.debezium.connector.oracle.logminer.LogMinerHelper.checkSupplementalLogging(LogMinerHelper.java:407)
	at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:132)
	... 7 more

Из информации журнала можно узнать, что авторская таблица TEST1Автоматически преобразуется в нижний регистризповерхность test1。Появляется сообщение об ошибкесуществовать checkSupplementalLogging ,ВЭто основано на этом сообщении об ошибке Проверять Исходный код находится здесьверно Oracle внутри ALL_LOG_GROUPS поверхностьсделай это один раз Запрос,Запрос данных Невыход приведет к ошибке.(ALL_LOG_GROUPSповерхностьхранится визкапитализируетсяизповерхностьимяTEST1)。

Язык кода:java
копировать
static String tableSupplementalLoggingCheckQuery(TableId tableId) {
        return String.format("SELECT 'KEY', LOG_GROUP_TYPE FROM %s WHERE OWNER = '%s' AND TABLE_NAME = '%s'", ALL_LOG_GROUPS, tableId.schema(), tableId.table());
    }
Язык кода:java
копировать
// Преобразуйте имя таблицы в нижний регистр. Исходный код выглядит следующим образом.
private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

На данный момент мы можем обойти эту проблему тремя способами:

  • Непосредственно измените исходный код и замените приведенное выше. toLowercase Изменить на toUppercase
  • существоватьсоздавать Oracle Source Table час,существовать WITH Добавить параметры 'debezium.database.tablename.case.insensitive'='false' Конфигурация,Пусть он потеряет функцию «нечувствительности к регистру».,существоватьtable-nameТребуется капитализацияповерхностьимя。
  • Переключитесь на другую версию Oracle. Автор здесь использует версию Oracle 12c, и все работает нормально.

Второе: проблема с задержкой обновления данных.

Авторское руководство по направлению существуют Oracle база данных Писатьданные,проходитьсуществовать IDEA Когда консоль распечатывает данные, обнаруживается, что при добавлении данных (Append) данные будут примерно 15s задержка При записи обновления (Upsert) задержка будет больше. Иногда это необходимо. Регистрация изменений данных занимает 3–5 минут. Для решения этой проблемы Флинк CDC FAQ Ясное решение дано в существованиисоздавать Oracle Source Table час,существовать WITH Добавить Параметры следующие:

Язык кода:sql
копировать
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

Третье: проблема настройки параллелизма

Автор здесь существующего пытается обеспечить параллелизм 2 При чтении данных я обнаружил следующую ошибку:

Язык кода:java
копировать
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
	at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:114)
	at com.demo.OracleToKudu.main(OracleToKudu.java:67)

После обнаружения информации о стеке Oracle CDC Степень параллелизма можно установить только 1, с Oracle CDC Официальная документация последовательный.

Язык кода:java
копировать
// Часть кода ошибки
public static void validateParallelism(int parallelism, boolean canBeParallel) {
    Preconditions.checkArgument(canBeParallel || parallelism == 1, "The parallelism of non parallel operator must be 1.");
    Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism of an operator must be at least 1, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose