Чтобы гарантировать, что данные, которые необходимо объединить в левом и правом потоках, появятся в одном и том же узле, Flink SQL будет использовать условия включения в соединении для разделения одних и тех же условий ассоциации. Данные распределяются в один и тот же раздел.
Существующие таблица заказов A и таблица платежей B связываются для получения сводной таблицы C. Исходные данные таблицы заказов и таблицы платежей следующие:
Таблица A: Данные таблицы заказов
order_id | timestamp |
---|---|
1001 | 2023-02-04 10:00:00 |
1002 | 2023-01-04 10:01:02 |
Таблица B: Данные таблицы платежей
order_id | pay_money |
---|---|
order_id | pay_money |
1001 | 80 |
1002 | 100 |
Когда поступает каждый фрагмент данных в таблице A, он будет связан с данными в таблице B:
Таким образом, результат соединения приведенных выше таблиц A и B:
order_id | timestamp | pay_money |
---|---|---|
1002 | 2023-01-04 10:01:02 | 100 |
Когда в таблицу Б поступает 1001 новых данных, новые данные выглядят следующим образом:
order_id | pay_money |
---|---|
order_id | pay_money |
1001 | 80 |
На данный момент данные в таблице результатов следующие:
order_id | timestamp | pay_money |
---|---|---|
1002 | 2023-01-04 10:01:02 | 100 |
1001 | 2023-02-04 10:00:00 | 80 |
Уведомление
Внутреннее соединение не генерирует поток восстановления.
Когда данные в таблице A поступают, он активно выполняет связанный запрос с данными в таблице B, но никаких связанных данных нет. Результаты также будут выведены, а недостающие поля будут заполнены значением null.
После поступления данных 1002 в таблице B и данных 1001 и 1002 в таблице A данные в таблице C после ассоциации будут следующими:
order_id | timestamp | pay_money |
---|---|---|
1001 | 2023-02-04 10:00:00 | null |
1002 | 2023-01-04 10:01:02 | 100 |
Когда поступают данные 1001 в таблице B, они также будут активно связаны с данными в таблице A. Если данные в таблице уже имеют выходные результаты и отсутствующее поле имеет значение NULL, то Будет сгенерирован поток коррекции, ранее выведенные данные будут удалены -D, а полные данные будут повторно выведены +I.
order_id | timestamp | pay_money | / |
---|---|---|---|
1001 | 2023-02-04 10:00:00 | null | +I |
1002 | 2023-01-04 10:01:02 | 100 | +I |
1001 | 2023-02-04 10:00:00 | null | -D |
1001 | 2023-02-04 10:00:00 | 80 | +I |
Уведомление
left Join создаст поток коррекции.
Когда в таблицу B поступает 1001, а данные в таблицу A не поступают, данные все равно будут выведены, а недостающие поля будут заменены нулевыми. Когда поступают данные 1002 в таблице B, данные в таблице A Данные 1002 поступили и могут быть связаны с данными. Результаты ассоциации следующие:
order_id | timestamp | pay_money |
---|---|---|
1001 | null | null |
1002 | 2023-01-04 10:01:02 | 100 |
Когда поступают данные 1001 в таблице A, они будут активно связаны с таблицей B. В это время данные о 1001 будут выведены в результате, и будет сгенерирован поток восстановления.
order_id | timestamp | pay_money | / |
---|---|---|---|
1001 | null | null | +I |
1002 | 2023-01-04 10:01:02 | 100 | +I |
1001 | null | null | -D |
1001 | 2023-02-04 10:00:00 | 80 | +I |
Уведомление
Right Join создаст поток коррекции.
Когда данные 1001 в таблице B поступают первыми, они активно переходят к таблице A для выполнения связанного запроса. Если данные не могут быть связаны, результат все равно будет выведен.
Когда данные в таблице A поступают, он активно выполняет связанный запрос с данными в таблице B. В настоящее время в таблице B имеется только 1001 данных. Если данные недоступны, результат все равно будет выведен.
Таким образом, результат корреляции на данный момент выглядит следующим образом:
order_id | timestamp | pay_money |
---|---|---|
1001 | null | null |
1002 | 2023-01-04 10:01:02 | null |
Когда поступает 1001 в таблице A, связанный запрос будет выполнен с таблицей B. Когда появится 1002 в таблице B, связанный запрос будет выполнен с таблицей A. Результаты следующие:
order_id | timestamp | pay_money | / |
---|---|---|---|
1001 | null | null | +I |
1002 | 2023-01-04 10:01:02 | null | +I |
1001 | null | null | -D |
1001 | 2023-02-04 10:00:00 | 80 | +I |
1002 | 2023-01-04 10:01:02 | null | -D |
1002 | 2023-01-04 10:01:02 | 100 | +I |
Уведомление
Полное соединение сгенерирует поток восстановления.
Интервальное соединение представляет собой ограниченное соединение по сравнению с двухпотоковым соединением UnBounded. То есть каждый фрагмент данных в каждом потоке будет относиться к другому временному интервалу в другом потоке. Выполните JOIN для данных.
SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION
Есть два способа записи TIMEBOUND_EXPRESSION:
Семантика Interval JOIN заключается в том, что каждый фрагмент данных соответствует интервалу данных во временном интервале. Например, существует таблица заказов Orders(orderId, ProductName, orderTime) и таблицу платежей Payment(orderId, payType, payTime). Предположим, мы хотим подсчитать информацию о заказе, который оплачен в течение одного часа с момента размещения. SQL-запрос выглядит следующим образом:
SELECT
o.orderId,
o.productName,
p.payType,
o.orderTime,
cast(payTime as timestamp) as payTime
FROM
Orders AS o JOIN Payment AS p ON
o.orderId = p.orderId AND
p.payTime BETWEEN orderTime AND
orderTime + INTERVAL '1' HOUR
orderId | productName | orderTime |
---|---|---|
001 | iphone | 2018-12-26 04:53:22.0 |
002 | mac | 2018-12-26 04:53:23.0 |
003 | book | 2018-12-26 04:53:24.0 |
004 | cup | 2018-12-26 04:53:38.0 |
orderId | payType | payTime |
---|---|---|
001 | alipay | 2018-12-26 05:51:41.0 |
002 | card | 2018-12-26 05:53:22.0 |
003 | card | 2018-12-26 05:53:30.0 |
004 | alipay | 2018-12-26 05:53:31.0 |
Семантически ожидаемый результат Информация с идентификатором заказа 003 не отображается в результатах.,Потому что время заказа2018-12-26 04:53:24.0
, Срок оплаты составляет
2018-12-26 05:53:30.0
превосходить1почасовая оплата。
Тогда информация об ожидаемом результате будет следующей:
orderId | productName | payType | orderTime | payTime |
---|---|---|---|---|
001 | iphone | alipay | 2018-12-26 04:53:22.0 | 2018-12-26 05:51:41.0 |
002 | mac | card | 2018-12-26 04:53:23.0 | 2018-12-26 05:53:22.0 |
004 | cup | alipay | 2018-12-26 04:53:38.0 | 2018-12-26 05:53:31.0 |
Таким образом, заказ с идентификатором 003 является недействительным заказом, и запасы можно обновить, чтобы продолжить продажи.
Далее наглядно проиллюстрируем семантику Interval JOIN на диаграмме. Немного меняем требования приведенного выше примера: Заказы можно оплачивать заранее (можно ли). Разумно это или нет, мы лишь объясняем семантику) то есть оплата в течение 1 часа до и после заказа действительна. Оператор SQL выглядит следующим образом:
SELECT
...
FROM
Orders AS o JOIN Payment AS p ON
o.orderId = p.orderId AND
p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
orderTime + INTERVAL '1' HOUR
Таблица измерений — это концепция, основанная на моделировании хранилища данных. В модели хранилища данных таблица фактов (таблица фактов) относится к таблице, в которой хранятся записи фактов, например системной Журналы, записи о продажах и т. д. Таблица измерений представляет собой таблицу, соответствующую таблице фактов. Она сохраняет соответствующую подробную информацию об указанных атрибутах в таблице фактов и может быть связана с таблицей фактов. Соединение эквивалентно извлечению и стандартизации часто повторяющихся атрибутов таблицы фактов и управлению ими в одной таблице.
В реальном производстве у нас часто возникает потребность на основе исходного потока данных связать большое количество внешних таблиц для дополнения некоторых атрибутов. Эта операция запроса Типичная таблица размерностей JOIN.
Поскольку таблица измерений является постоянно изменяющейся таблицей (статическая таблица считается частным случаем динамической таблицы), таблица измерений JOIN Когда необходимо указать пару снимков таблицы измерений, связанных с этой записью. время. Флинк SQL таблица размеров JOIN грамматика введена Temporal Table Стандартная грамматика используется для объявления того, с каким снимком поверхности измерения связаны данные потока.
Необходимо Уведомление есть, на данный момент родное Flink SQL таблица размеров JOIN Поддерживается только связь таблицы фактов со снимком таблицы измерений в текущий момент (семантика времени обработки), а факты не поддерживаются. поверхность rowtime Соответствует таблице Корреляция снимков размеров (семантика времени события).
Flink SQL используется вграмматикаfor SYSTEM_TIME as of PROC_TIME()
Чтобы определить размерыповерхностьJOIN。Поддерживает толькоINNER JOIN
иLEFT JOIN
。
SELECT
column-namesFROM
table1 [AS <alias1>][LEFT]
JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
Уведомление:
table1.proctime
поверхность Показыватьtable1
изproctime
Поле。
Ниже приведен простой пример, демонстрирующий размер поверхности. JOIN грамматика. Предположим, у нас есть Orders Поток данных заказа, надеюсь, на основе пользователя ID Полное количество пользователей в заказах информацию, поэтому вам нужно следить Customer Измерьте поверхность для корреляции.
CREATE TABLE Orders (
id INT,
price DOUBLE,
quantity INT,
proc_time AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'rows-per-second' = '10'
);
CREATE TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
CREATE TABLE OrderDetails (
id INT,
total_price DOUBLE,
country STRING,
zip STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/orderdb',
'table-name' = 'orderdetails'
);
-- enrich each order with customer information
INSERT INTO OrderDetails
SELECT
o.id,
o.price,
o.quantity,
c.country,
c.zipFROM
Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.id = c.id;
Apache Calcite — это инструмент анализа SQL с открытым исходным кодом, который широко используется в различных проектах по работе с большими данными и в основном используется для анализа операторов SQL. Порядок выполнения SQL Обычно делится на четыре основных этапа: