(1) Левый поток
Таблица заказов (заказы)
id productName orderTime
1 iphone 2020-04-01 10:00:00.0
2 mac 2020-04-01 10:02:00.0
3 huawei 2020-04-01 10:03:00.0
4 pad 2020-04-01 10:05:00.0
(2) Правый поток
Таблица логистики (отгрузки)
shipId orderId status shiptime
0 1 shipped 2020-04-01 11:00:00.0
1 2 delivered 2020-04-01 17:00:00.0
2 3 shipped 2020-04-01 12:00:00.0
3 4 shipped 2020-04-01 11:30:00.0
(1)Код
//Задержка 0 с
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
left.join(right)
.where(_._1).equalTo(_._2) //Поле объединения Первое поле (id) левого потока равный Второе поле правого потока (orderId)
.window(TumblingEventTimeWindows.of(Time.hours(window))) //скользящее окно
//IN1 (Int,String,Long) id productName orderTime
//IN2 (Int, Int,String,Long) shipId orderId status shiptime
//OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
.apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
(first._1, first._2, second._3, first._3, second._4)
}
}).print()
env.execute()
(2) Анализ и выходные результаты
WM=Максимальное время в окне — время, разрешенное для задержки выполнения.
ВМ постоянно растет
Условия срабатывания окна: WM 》=Конечная граница предыдущего окна
Максимальное время в окне — время, разрешенное для задержки выполнения. > = Конечная граница предыдущего окна
nowTimeStamp nowTime currentMaxT WM окно Перевести окно в часы Перевести WM в часы
Таблица заказов (заказы)
(1,iphone,1585706400000) -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00 10:00:00-12:00:00 [10-12) 10:00
(2,mac,1585706520000) -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00 10:00:00-12:00:00 [10-12) 10:02
(3,huawei,1585706580000) -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00 10:00:00-12:00:00 [10-12) 10:03
(4,pad,1585706700000) -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00 10:00:00-12:00:00 [10-12) 10:05
Таблица логистики (отгрузки)
(0,1,shipped,1585710000000) -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00 10:00:00-12:00:00 [10-12) 11
(1,2,delivered,1585731600000) -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00 16:00:00-18:00:00 [16-18) 17
(2,3,shipped,1585713600000) -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00 12:00:00-14:00:00 [12-14) 17
(3,4,shipped,1585711800000) -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00 10:00:00-12:00:00 [10-12) 17 //окно WM 17,больше, чемокноизконечная граница12,Окно-триггер
Четыре элемента данных в таблице заказов находятся в том же окне, что и (0,1,отгружено,1585710000000) и (3,4,отгружено,1585711800000) в таблице логистики (отгрузки).
Когда поток (3,4,shipped,1585711800000) вводится в таблицу логистики, WM окна равен 17 (час), что больше конечной границы окна 12 (час), а окно Window равно сработало.
Результат вывода:
Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
Поддерживает INNER JOIN, LEFT JOIN, RIGHT JOIN и FULL JOIN. Если JOIN используется напрямую, по умолчанию используется INNER JOIN.
SEMI JOIN и ANTI JOIN пока не поддерживаются.
TIMEBOUND_EXPRESSION — это условное выражение интервала в столбцах атрибутов времени левого и правого потока. Оно поддерживает следующие три условных выражения:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
(1)Код
//Задержка 0 с
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
left.print("orderStream=>")
right.print("shipMentStream=>")
left
.keyBy(0)
.intervalJoin(right.keyBy(1))
// between Поддерживает только event time
//временной интервал -> leftStream По умолчанию и Присоединяйтесь к rightStream в диапазоне времени [left+0hour,left+4hour]
//Поток заказов и Данные, которые задерживаются в течение 4 часов после отправки логистического потока, могут быть объединены.
.between(Time.hours(0), Time.hours(4))
//Не включает нижнюю границу
//.lowerBoundExclusive()
//Не включает верхнюю границу
//.upperBoundExclusive()
.process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
//orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
}
})
.print("IntervalJoin=>");
env.execute("IntervalJoinTest")
(2) Анализ и выходные результаты
временной интервал -> leftStream По умолчанию и Присоединяйтесь к rightStream в диапазоне времени [left+0hour,left+4hour]
Данные с задержкой в течение 4 часов могут быть объединены потоком заказов (левый поток) и потоком логистики доставки (правый поток).
Результат вывода:
IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
.map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
.map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))
val tableEnvironment = StreamTableEnvironment.create(env)
val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)
val table: Table = tableEnvironment.sqlQuery(
s"""
|SELECT o.id, o.productName, s.status
|FROM $orderTable AS o
|JOIN $shipmentsTable AS s on o.id = s.orderId AND
| o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
|""".stripMargin)
tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")
env.execute()
Обратите внимание, что SQL и API немного различаются по написанию, но оба означают, что поток заказов может присоединиться к потоку ShipMent для задержки данных в течение 4 часов.
o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
и
orderStream
.keyBy(0)
.intervalJoin(shipTimeStream.keyBy(1))
.between(Time.hours(0), Time.hours(4))
(1)Код
//Задержка 0 с
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin, связываем по имени
.window(TumblingEventTimeWindows.of(Time.hours(window))) //Прокрутка окна
//IN1 (Int,String,Long) id productName orderTime
//IN2 (Int, Int,String,Long) shipId orderId status shiptime
//OUT (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
.apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
for (firstEle <- first.asScala) {
var flag = false
for (secondEle <- second.asScala) {
//left join: могу присоединиться
out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
flag = true
}
//left join: Невозможно присоединиться
if (!flag) {
out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
}
}
}
})
leftJoinResult.print()
env.execute()
(2) Анализ и результаты
Присоединение не имеет никакого реального значения,Это просто более гибко при выводе,Вывод можно настроить. Вышеупомянутый метод записи аналогичен методу соединения левого значения в SQL.
Результат вывода:
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)
org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
Class IntervalJoin{
Method between{
//IntervalJoin поддерживает только EventTime
if (timeCharacteristic != TimeCharacteristic.EventTime) {
throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
}
}
}
}
org.apache.flink.streaming.api.scala.KeyedStream(scala){
Class IntervalJoin{
//между аннотацией метода leftElement.timestamp + lowerBound <= rightElement.timestamp<= leftElement.timestamp + upperBound
Method between{
new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
//Нижняя граница включена по умолчанию, этот метод исключает нижнюю границу
Method lowerBoundExclusive{ this.lowerBoundInclusive = false }
//Верхняя граница включена по умолчанию, этот метод исключает верхнюю границу
Method upperBoundExclusive{ this.upperBoundInclusive = false }
//Пользовательскую функцию необходимо передать в метод процесса
Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
//Метод процесса в Scala переходит к методу процесса в Java
asScalaStream(javaJoined.process(processJoinFunction, outType)){
//Фокус метода процесса в Java фокусируется класс IntervalJoinOperator, подключение потока
SingleOutputStreamOperator<OUT> process{
//[Важный метод 1]
//An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
operator=new IntervalJoinOperator<>{
}
//[Важный метод 2]
//
return left
//Операция соединения выполняется над двумя KeyedStreams для получения ConnectedStreams. Таким образом, можно добиться совместного использования состояния между двумя потоками данных. Для интервального соединения данные с одинаковым ключом в двух потоках могут получать доступ друг к другу.
.connect(right)
//Assigns keys to the elements,return The partitioned ConnectedStreams。
.keyBy(keySelector1, keySelector2)
//creating a transformed output stream.
.transform("Interval Join", outputType, operator);
}
}
}
}
}
}
}
}
class IntervalJoinOperator{
//Текущий Штат использует MapState, принадлежащий Keyed Тип государства. Состояние можно понимать как локальный кэш.
//Используется для хранения двух потоков соответственноизданные,Где Long соответствует временной метке данных.,List<BufferEntry>соответствуют одной и той же временной меткеизданные(вBufferEntryиметьelementи hasBeenJoined два свойства)
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
//Инициализируем состояние карты
Method initializeState{
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
}
//processElement1 обрабатывает левый поток и вызывает методprocessElement.
Method processElement1{
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
//processElement2 обрабатывает правильный поток и вызывает методprocessElement.
Method processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
//Метод описывает, что при прибытии двух потоков, например, после поступления данных в левый поток, необходимо перейти к правому потоку, чтобы найти данные в пределах соответствующих верхних и нижних границ. Оба метода вызывают методprocessElement.
Method processElement{
//Получаем значение потока
final THIS ourValue = record.getValue();
//Получаем временную метку потока
final long ourTimestamp = record.getTimestamp();
//Значение временной метки должно иметь практическое значение, обычно используйте EventTime
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
//Определяем, задерживается ли запись. Если она задерживается, выходим из метода напрямую, без какой-либо обработки.
if (isLate(ourTimestamp)) {
//Когда временная метка поступающей записи меньше текущего уровня воды, это означает, что данные задерживаются и обработка данных не выполняется.
return;
}
//Добавляем запись в MapState соответствующего потока и помечаем запись флажком несвязанного флага
addToBuffer(ourBuffer, ourValue, ourTimestamp);
//Обходим MapStat другого потока: otherBuffer
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
//Временная метка другой записи потока
final long timestamp = bucket.getKey();
// Например, временная метка OurTimestamp текущего потока и временная метка другого потока удовлетворяют следующему соотношению:
//ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
//Затем выполняем операцию соединения, иначе никакая операция выполняться не будет.
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
//Получаем значение другого потока и выполняем логику пользовательской функции
//Возьмем большую временную метку в двух потоках в качестве входных данных переопределенного методаprocessElemen в определяемой пользователем функции ProcessJoinFunction
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
//Левый поток выполняет логику соединения
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
//Правый поток выполняет логику соединения
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
//Очистить время текущего потока
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
//Подробную информацию об очистке состояния см. в анализе onEventTime этого класса.
//Запланированное время очистки — это текущее записанное время + относительныйUpperBound. Если водяной знак превышает это время, его необходимо очистить.
//Это можно понимать как добавлениеrelativeUpperBound для продления времени удаления текущего потока записей из состояния.
if (isLeft) {
//Выполнение левого потока, регистрация запланированного времени очистки
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
//Правильное выполнение потока, регистрируем запланированное время очистки
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
//Определяем, задерживается ли временная метка записи
Method boolean isLate(long timestamp) {
//Получаем водяной знак текущего событияTime. Уровень воды — монотонно возрастающая функция.
long currentWatermark = internalTimerService.currentWatermark();
//Если временная метка в записи меньше, чем currentWatermark, вернуть true. То есть, когда временная метка поступающей записи меньше водяного знака, это означает, что данные задерживаются и не будут обработаны или связаны с данными. еще один поток.
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}
//Добавляем записи в MapState соответствующего потока
Method void addToBuffer{
List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
}
//Добавляем несвязанную метку к измененной записи по умолчанию: false: new BufferEntry<>(value, false)
elemsInBucket.add(new BufferEntry<>(value, false));
buffer.put(timestamp, elemsInBucket);
}
// метод цанги, возьмите большую временную метку в двух потоках в качестве входных данных переопределенного методаprocessElemen в определяемой пользователем функции ProcessJoinFunction
Method collect {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
//Очистим запись EventTime, водяной знак которой больше записи
Method onEventTime(InternalTimer<K, String> timer){
//Регистрируем время очистки текущего потока (вместо отметки времени данных)
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
//Предположение: Предположим, что временная метка данных, поступающих из текущего потока, равна 10 с, а время, прошедшее между ними, составляет 1 с и 5 с соответственно. LowerBound — 1 с, UpperBound — 5 с,
//значение: Левый поток может быть присоединен к правому потоку. Временной диапазон: Данные [левый поток + 1, левый поток + 5], то есть Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s
Правый поток может быть присоединен к левому потоку. Временной диапазон: Данные [правого потока-5, правого потока-1], то есть Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s
//[Ключ] Очистка времени текущего потока
//long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
//Очистить статус левого потока, В это время время очистки = Метка времени + 5 с, то есть данные с меткой времени 10 с в левом потоке могут быть очищены за 15 секунд.
case CLEANUP_NAMESPACE_LEFT: {
//в соответствии с Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s ; нижняя граница — 1 с, верхняя граница — 5 с.。
//Если это левый поток, вызываем методprocessElement1, relativeUpperBound равен 5, то есть relativeUpperBound>0, в это время timerTimestamp=10+5=15s。
//В это время Очистите временную метку левого потока=timerTimestamp=15s.
//Когда время достигнет 15 секунд,Вы можете очистить данные левого потока с отметкой времени 10 секунд.,То есть посмотрите на поток справа на 15с.,Нужно найтииз Временной диапазон левого потока10s<=Временная метка левого потока<=14s,такwatermark>15sМожно очистить, когда Временная метка левого потокадля10sданные。
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}",временная метка);
leftBuffer.remove (метка времени);
перерыв;
}
//Очистить статус правого потока,В это время время очистки = Временная метка, то есть данные с временной меткой в 10 секунд в правом потоке, могут быть очищены за 10 секунд.
case CLEANUP_NAMESPACE_RIGHT: {
//Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s;в это времяrelativeLowerBoundдля-5,относительныйUpperBoundдля-1。
//Если приходят нужные данные потока, вызываем методprocessElement2 ,relativeLowerBoundдля-5Прямо сейчасrelativeLowerBound<0,в это время timerTimestamp=10s。
//Очистить временную метку правого потока в этот момент=timerTimestamp + lowerBound =10с-5=5с, что на самом деле очищается, так это данные с правильным потоком 5с? ? ?
//Когда время достигнет 10 секунд,Вы можете очистить данные с отметкой времени 10 секунд в правом потоке.,То есть посмотрите на поток слева на 10с.,Необходимо найти правильный временной диапазон трансляции.11s<=Временная метка справа<=16s,тактакwatermark>10sМожно очистить, когда右边流时间戳для10sданные。
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
(1)Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s ; нижняя граница — 1 с, верхняя граница — 5 с.
(2) Если это левый поток, вызовите методprocessElement1, relativeUpperBound равен 5, то есть relativeUpperBound>0, в это время timerTimestamp=10+5=15s。
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
(3) В это время relativelowerBoundдля1Прямо сейчасlowerBound>0; Очистите временную метку левого потока=timerTimestamp=15s.
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
(4)Вывод:
Когда время достигнет 15 секунд,Вы можете очистить данные левого потока с отметкой времени 10 секунд.,То есть посмотрите на поток справа на 15с.,Нужно найтииз Временной диапазон левого потока10s<=Временная метка левого потока<=14s,такwatermark>15sМожно очистить, когда Временная метка левого потокадля10sданные。
(1)Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s;в это времяrelativeLowerBoundдля-5,относительныйUpperBoundдля-1。
(2) Если поступают правильные данные потока, вызовите методprocessElement2. ,relativeLowerBoundдля-5Прямо сейчасrelativeLowerBound<0,в это время timerTimestamp=10s。
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
1
(3) В это время relativelowerBoundдля-5Прямо сейчасlowerBound<0;чистый правый потокизtimestamp=timerTimestamp + lowerBound =10с-5=5с, что на самом деле очищается, так это данные с правильным потоком 5с? ? ?
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
(4) Заключение
Когда время достигнет 10 секунд,Вы можете очистить данные с отметкой времени 10 секунд в правом потоке.,То есть посмотрите на поток слева на 10с.,Необходимо найти правильный временной диапазон трансляции.11s<=Временная метка справа<=16s,тактакwatermark>10sМожно очистить, когда右边流时间戳для10sданные。
Штат использует MapState, принадлежащий Keyed Тип государства. Состояние можно понимать как локальный кеш, который используется для хранения данных двух потоков соответственно. Его структура данных MapState<Long, List>,Где Long соответствует временной метке данных.,Список соответствует данным с той же меткой времени (где BufferEntry имеет элемент и hasBeenJoined два свойства)
Время очистки состояния левого потока = OurTimestamp (событие в потоке данных) + относительныйUpperBound (верхняя граница временного диапазона)
Время очистки состояния правого потока = OurTimestamp (Событие в потоке данных)
Ссылки
Flink DataStream Join && IntervalJoin && Разница между когруппами
https://blog.csdn.net/qq_33689414/article/details/93875881
(Версия Flink в реальном времени Alibaba Cloud) Оператор IntervalJoin
https://help.aliyun.com/document_detail/195298.html
(Принцип) Apache Flink Серия случайных разговоров - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache%20Flink%20%E6%BC%AB%E8%B0%88%E7%B3%BB%E5%88%97%20-%20Time%20Interval%20JOIN/#more
(Механизм очистки состояния) Flink1.11 intervalJoin создание водяных знаков,Понимание исходного кода механизма очистки состояния&Demoанализировать
https://blog.csdn.net/qq_34864753/article/details/111183556
(Анализ исходного кода) Flink Interval Join Анализ использования и принципов
https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242
Упомянутое выше СОЕДИНЕНИЕ двух потоков, но реальный сценарий может включать СОЕДИНЕНИЕ четырех или даже шести потоков. Как это реализовать?
Этапы реализации:
1. Используйте Canal для синхронизации binlog MySQL с Kafka в реальном времени для формирования соответствующего потока.
2. Асинхронно соедините таблицу A с другими таблицами (таблицами B, C и D) в исходном MySQL. Внесите соответствующие дополнения, исключения и изменения в таблицу Е.
Обратите внимание на предварительные условия здесь:
(1) Четыре таблицы MySQL обновляются нечасто, поскольку, если они обновляются часто, использование MySQL для асинхронного соединения может не соответствовать требованиям QPS.
(2) Таблица A выбирается в зависимости от обстоятельств для объединения таблиц B, C и D. Необходимо объединять только таблицы с добавлениями, удалениями и изменениями цели E.
Этапы реализации:
1. Используйте Canal для синхронизации бинлога MySQL с Kafka в реальном времени и потока таблицы фактов.
2. Используйте Canal для синхронизации таблицы измерений B MySQ с Phoenix в реальном времени.
3. Используйте поток A в Kafka для асинхронного соединения таблицы измерений B в Phoenix и записывайте результаты в таблицу результатов C в Phoenix.
Уведомление:
(1) Здесь таблица измерений B синхронизируется с Phoenix в режиме реального времени, поскольку количество запросов в секунду таблицы измерений B относительно велико (таблица измерений здесь представляет собой широкую концепцию. Если количество запросов в секунду относительно низкое, вы можете напрямую использовать таблицу измерений B). в MySQL.
Если TimeWindowJoin используется для объединения двух таблиц реального времени, статус данных будет сохранен в рабочем состоянии Flink. Во-первых, здесь используется стороннее хранилище Phoenix. Во-вторых, недостатком IntervalJoin является то, что если один из потоков задерживается и задержка превышает время истечения State, произойдет потеря данных. Для решения этой проблемы здесь используется вывод CoGroupJoin+sideflow.
Этапы реализации:
1. Используйте Canal для синхронизации бинлога MySQL с Kafka в реальном времени, потока таблицы фактов A и потока B.
2. Используйте поток A для потока группы B.
3. Опоздание потока, SideputTag+API/DB (используйте API для асинхронного соединения данных таблицы B из базы данных). Таким же образом, поток B задерживается, SideputTag+API/DB (используйте API для асинхронного соединения данных таблицы A из базы данных).
4.СОЮЗ. ОБЪЕДИНЯЙТЕ все потоки и записывайте в таблицу Phoenix C.
Уведомление:
Разница здесь и IntervalJoin заключается в,Не использовать Flink,Вместо этого задержанные данные выводятся напрямую через SideOutPutTag.,И асинхронное соединение данных в MySQL.