Различия и производственное использование двухпотокового и многопоточного соединения Flink, IntervalJoin и coGroupJoin.
Различия и производственное использование двухпотокового и многопоточного соединения Flink, IntervalJoin и coGroupJoin.

1.Flink Три типа тестов кода соединения

1.1 Источник данных

(1) Левый поток

Язык кода:javascript
копировать
Таблица заказов (заказы)
id   productName       orderTime
1     iphone           2020-04-01 10:00:00.0
2     mac               2020-04-01 10:02:00.0
3     huawei           2020-04-01 10:03:00.0
4     pad               2020-04-01 10:05:00.0

(2) Правый поток

Язык кода:javascript
копировать
Таблица логистики (отгрузки)
shipId  orderId status       shiptime
0       1         shipped       2020-04-01 11:00:00.0
1       2         delivered     2020-04-01 17:00:00.0
2      3         shipped       2020-04-01 12:00:00.0
3      4         shipped       2020-04-01 11:30:00.0

1.2 join

(1)Код

Язык кода:javascript
копировать
//Задержка 0 с
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.join(right)
  .where(_._1).equalTo(_._2)  //Поле объединения Первое поле (id) левого потока равный Второе поле правого потока (orderId)
  .window(TumblingEventTimeWindows.of(Time.hours(window)))  //скользящее окно
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
      (first._1, first._2, second._3, first._3, second._4)
    }
  }).print()

 env.execute()

(2) Анализ и выходные результаты

Язык кода:javascript
копировать
WM=Максимальное время в окне — время, разрешенное для задержки выполнения.
ВМ постоянно растет
Условия срабатывания окна:  WM 》=Конечная граница предыдущего окна
              Максимальное время в окне — время, разрешенное для задержки выполнения. > = Конечная граница предыдущего окна

                                     nowTimeStamp   nowTime     currentMaxT     WM         окно                Перевести окно в часы      Перевести WM в часы
Таблица заказов (заказы)
(1,iphone,1585706400000)          -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00    10:00:00-12:00:00    [10-12)            10:00
(2,mac,1585706520000)             -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00    10:00:00-12:00:00    [10-12)            10:02
(3,huawei,1585706580000)          -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00    10:00:00-12:00:00    [10-12)            10:03
(4,pad,1585706700000)             -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00    10:00:00-12:00:00    [10-12)            10:05

Таблица логистики (отгрузки)
(0,1,shipped,1585710000000)       -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00    10:00:00-12:00:00    [10-12)            11
(1,2,delivered,1585731600000)     -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00    16:00:00-18:00:00    [16-18)            17
(2,3,shipped,1585713600000)       -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00    12:00:00-14:00:00    [12-14)            17
(3,4,shipped,1585711800000)       -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00    10:00:00-12:00:00    [10-12)            17      //окно WM 17,больше, чемокноизконечная граница12,Окно-триггер

Четыре элемента данных в таблице заказов находятся в том же окне, что и (0,1,отгружено,1585710000000) и (3,4,отгружено,1585711800000) в таблице логистики (отгрузки).

Когда поток (3,4,shipped,1585711800000) вводится в таблицу логистики, WM окна равен 17 (час), что больше конечной границы окна 12 (час), а окно Window равно сработало.

Результат вывода:

Язык кода:javascript
копировать
Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)

1.3 intervalJoin

Поддерживает INNER JOIN, LEFT JOIN, RIGHT JOIN и FULL JOIN. Если JOIN используется напрямую, по умолчанию используется INNER JOIN.

SEMI JOIN и ANTI JOIN пока не поддерживаются.

TIMEBOUND_EXPRESSION — это условное выражение интервала в столбцах атрибутов времени левого и правого потока. Оно поддерживает следующие три условных выражения:

Язык кода:javascript
копировать
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
1.3.1 Использование API интервалаJoin

(1)Код

Язык кода:javascript
копировать
//Задержка 0 с
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.print("orderStream=>")
right.print("shipMentStream=>")

left
  .keyBy(0)
  .intervalJoin(right.keyBy(1))
  // between Поддерживает только event time
  //временной интервал -> leftStream По умолчанию и Присоединяйтесь к rightStream в диапазоне времени [left+0hour,left+4hour]
  //Поток заказов и Данные, которые задерживаются в течение 4 часов после отправки логистического потока, могут быть объединены.
  .between(Time.hours(0), Time.hours(4))
  //Не включает нижнюю границу
  //.lowerBoundExclusive()
  //Не включает верхнюю границу
  //.upperBoundExclusive()
  .process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
    override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
      //orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
      out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
    }
  })
  .print("IntervalJoin=>");

  env.execute("IntervalJoinTest")

(2) Анализ и выходные результаты

временной интервал -> leftStream По умолчанию и Присоединяйтесь к rightStream в диапазоне времени [left+0hour,left+4hour]

Данные с задержкой в ​​течение 4 часов могут быть объединены потоком заказов (левый поток) и потоком логистики доставки (правый поток).

Результат вывода:

Язык кода:javascript
копировать
IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
1.3.2 Использование SQL интервалаJoin
Язык кода:javascript
копировать
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
   .map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
   .map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))

val tableEnvironment = StreamTableEnvironment.create(env)

val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)

val table: Table = tableEnvironment.sqlQuery(
  s"""
     |SELECT o.id, o.productName, s.status
     |FROM $orderTable AS o
     |JOIN $shipmentsTable AS s on o.id = s.orderId AND
     |     o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
     |""".stripMargin)

tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")

env.execute()

Обратите внимание, что SQL и API немного различаются по написанию, но оба означают, что поток заказов может присоединиться к потоку ShipMent для задержки данных в течение 4 часов.

Язык кода:javascript
копировать
o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime

и

Язык кода:javascript
копировать
orderStream
   .keyBy(0) 
   .intervalJoin(shipTimeStream.keyBy(1))
   .between(Time.hours(0), Time.hours(4))

1.4 coGroup

(1)Код

Язык кода:javascript
копировать
//Задержка 0 с
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
  coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin, связываем по имени
  .window(TumblingEventTimeWindows.of(Time.hours(window))) //Прокрутка окна
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
      for (firstEle <- first.asScala) {
        var flag = false
        for (secondEle <- second.asScala) {
          //left join: могу присоединиться
          out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
          flag = true
        }
        //left join: Невозможно присоединиться
        if (!flag) {
          out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
        }
      }
    }
  })

leftJoinResult.print()
env.execute()

(2) Анализ и результаты

Присоединение не имеет никакого реального значения,Это просто более гибко при выводе,Вывод можно настроить. Вышеупомянутый метод записи аналогичен методу соединения левого значения в SQL.

Результат вывода:

Язык кода:javascript
копировать
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)

2.intervalJoin анализ исходного кода

2.1 между методом входит в класс

Язык кода:javascript
копировать
org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
 Class IntervalJoin{
    Method between{
              //IntervalJoin поддерживает только EventTime 
 if (timeCharacteristic != TimeCharacteristic.EventTime) {
 throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
 }
     } 
 }
}

org.apache.flink.streaming.api.scala.KeyedStream(scala){
  Class IntervalJoin{
    //между аннотацией метода leftElement.timestamp + lowerBound <= rightElement.timestamp<= leftElement.timestamp + upperBound
    Method between{
         new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
 //Нижняя граница включена по умолчанию, этот метод исключает нижнюю границу
 Method lowerBoundExclusive{ this.lowerBoundInclusive = false  }
 //Верхняя граница включена по умолчанию, этот метод исключает верхнюю границу
 Method upperBoundExclusive{ this.upperBoundInclusive = false }
             //Пользовательскую функцию необходимо передать в метод процесса
 Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
                //Метод процесса в Scala переходит к методу процесса в Java
   asScalaStream(javaJoined.process(processJoinFunction, outType)){
       //Фокус метода процесса в Java фокусируется класс IntervalJoinOperator, подключение потока
       SingleOutputStreamOperator<OUT> process{
  //[Важный метод 1]
  //An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
   operator=new IntervalJoinOperator<>{
  }
  //[Важный метод 2]
               //  
  return left
   //Операция соединения выполняется над двумя KeyedStreams для получения ConnectedStreams. Таким образом, можно добиться совместного использования состояния между двумя потоками данных. Для интервального соединения данные с одинаковым ключом в двух потоках могут получать доступ друг к другу. 
   .connect(right)
   //Assigns keys to the elements,return The partitioned ConnectedStreams。
   .keyBy(keySelector1, keySelector2)
   //creating a transformed output stream.
   .transform("Interval Join", outputType, operator);

       }
   } 
 }
         }
      }
  }
 }
}

2.2 Удалить вышеописанный важный метод 1 IntervalJoinOperator и проанализировать его отдельно

Язык кода:javascript
копировать
class IntervalJoinOperator{
  //Текущий Штат использует MapState, принадлежащий Keyed Тип государства. Состояние можно понимать как локальный кэш.
  //Используется для хранения двух потоков соответственноизданные,Где Long соответствует временной метке данных.,List<BufferEntry>соответствуют одной и той же временной меткеизданные(вBufferEntryиметьelementи hasBeenJoined два свойства)
  private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
  private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
  
  //Инициализируем состояние карты
  Method initializeState{
   this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
  this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
  }

  //processElement1 обрабатывает левый поток и вызывает методprocessElement.
  
   Method processElement1{
 processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
   }
   //processElement2 обрабатывает правильный поток и вызывает методprocessElement.
   Method processElement2(StreamRecord<T2> record) throws Exception {
 processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
  }
  
  //Метод описывает, что при прибытии двух потоков, например, после поступления данных в левый поток, необходимо перейти к правому потоку, чтобы найти данные в пределах соответствующих верхних и нижних границ. Оба метода вызывают методprocessElement.
  Method processElement{
  //Получаем значение потока
               final THIS ourValue = record.getValue();
  //Получаем временную метку потока
               final long ourTimestamp = record.getTimestamp();
  
   //Значение временной метки должно иметь практическое значение, обычно используйте EventTime
  if (ourTimestamp == Long.MIN_VALUE) {
   throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
     "interval stream joins need to have timestamps meaningful timestamps.");
  }
  
  //Определяем, задерживается ли запись. Если она задерживается, выходим из метода напрямую, без какой-либо обработки.
  if (isLate(ourTimestamp)) {
                                //Когда временная метка поступающей записи меньше текущего уровня воды, это означает, что данные задерживаются и обработка данных не выполняется.
   return;
  }

  //Добавляем запись в MapState соответствующего потока и помечаем запись флажком несвязанного флага
  addToBuffer(ourBuffer, ourValue, ourTimestamp);
  
  //Обходим MapStat другого потока: otherBuffer
  for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
   //Временная метка другой записи потока
   final long timestamp  = bucket.getKey();
   
   // Например, временная метка OurTimestamp текущего потока и временная метка другого потока удовлетворяют следующему соотношению:
   //ourTimestamp + relativeLowerBound <=  timestamp <= ourTimestamp + relativeUpperBound
   //Затем выполняем операцию соединения, иначе никакая операция выполняться не будет.
   if (timestamp < ourTimestamp + relativeLowerBound ||
     timestamp > ourTimestamp + relativeUpperBound) {
    continue;
   }
   
   //Получаем значение другого потока и выполняем логику пользовательской функции
   //Возьмем большую временную метку в двух потоках в качестве входных данных переопределенного методаprocessElemen в определяемой пользователем функции ProcessJoinFunction
   for (BufferEntry<OTHER> entry: bucket.getValue()) {
    if (isLeft) {
     //Левый поток выполняет логику соединения
     collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
    } else {
     //Правый поток выполняет логику соединения
     collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
    }
   }
  }
  //Очистить время текущего потока
  long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  
  //Подробную информацию об очистке состояния см. в анализе onEventTime этого класса.
                            //Запланированное время очистки — это текущее записанное время + относительныйUpperBound. Если водяной знак превышает это время, его необходимо очистить.
  //Это можно понимать как добавлениеrelativeUpperBound для продления времени удаления текущего потока записей из состояния.
  if (isLeft) {
   //Выполнение левого потока, регистрация запланированного времени очистки
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
  } else {
   //Правильное выполнение потока, регистрируем запланированное время очистки
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
  }

  }
 
 //Определяем, задерживается ли временная метка записи
 Method boolean isLate(long timestamp) {
              //Получаем водяной знак текущего событияTime. Уровень воды — монотонно возрастающая функция.
 long currentWatermark = internalTimerService.currentWatermark();
 //Если временная метка в записи меньше, чем currentWatermark, вернуть true. То есть, когда временная метка поступающей записи меньше водяного знака, это означает, что данные задерживаются и не будут обработаны или связаны с данными. еще один поток.
 return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
 }
 //Добавляем записи в MapState соответствующего потока
 Method  void addToBuffer{   
 List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
 if (elemsInBucket == null) {
      elemsInBucket = new ArrayList<>();
 }
              //Добавляем несвязанную метку к измененной записи по умолчанию: false: new BufferEntry<>(value, false)
 elemsInBucket.add(new BufferEntry<>(value, false));
 buffer.put(timestamp, elemsInBucket);
 }
 
 // метод цанги, возьмите большую временную метку в двух потоках в качестве входных данных переопределенного методаprocessElemen в определяемой пользователем функции ProcessJoinFunction
 Method collect {
  final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

  collector.setAbsoluteTimestamp(resultTimestamp);
  context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

  userFunction.processElement(left, right, context, collector);
 }
 
 //Очистим запись EventTime, водяной знак которой больше записи
 Method onEventTime(InternalTimer<K, String> timer){

  //Регистрируем время очистки текущего потока (вместо отметки времени данных)
   long timerTimestamp = timer.getTimestamp();

  String namespace = timer.getNamespace();

  logger.trace("onEventTime @ {}", timerTimestamp);

  switch (namespace) {
   //Предположение: Предположим, что временная метка данных, поступающих из текущего потока, равна 10 с, а время, прошедшее между ними, составляет 1 с и 5 с соответственно. LowerBound — 1 с, UpperBound — 5 с,
   //значение: Левый поток может быть присоединен к правому потоку. Временной диапазон: Данные [левый поток + 1, левый поток + 5], то есть Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s 
                Правый поток может быть присоединен к левому потоку. Временной диапазон: Данные [правого потока-5, правого потока-1], то есть Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s

   //[Ключ] Очистка времени текущего потока
   //long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

   //Очистить статус левого потока, В это время время очистки = Метка времени + 5 с, то есть данные с меткой времени 10 с в левом потоке могут быть очищены за 15 секунд.
   case CLEANUP_NAMESPACE_LEFT: {    
    //в соответствии с Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s ; нижняя граница — 1 с, верхняя граница — 5 с.。
    //Если это левый поток, вызываем методprocessElement1, relativeUpperBound равен 5, то есть relativeUpperBound>0, в это время timerTimestamp=10+5=15s。
    //В это время Очистите временную метку левого потока=timerTimestamp=15s.
    //Когда время достигнет 15 секунд,Вы можете очистить данные левого потока с отметкой времени 10 секунд.,То есть посмотрите на поток справа на 15с.,Нужно найтииз Временной диапазон левого потока10s<=Временная метка левого потока<=14s,такwatermark>15sМожно очистить, когда Временная метка левого потокадля10sданные。
    long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
    logger.trace("Removing from left buffer @ {}",временная метка);
    leftBuffer.remove (метка времени);
    перерыв;
   }

   //Очистить статус правого потока,В это время время очистки = Временная метка, то есть данные с временной меткой в ​​10 секунд в правом потоке, могут быть очищены за 10 секунд.
   case CLEANUP_NAMESPACE_RIGHT: {
    //Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s;в это времяrelativeLowerBoundдля-5,относительныйUpperBoundдля-1。  
    //Если приходят нужные данные потока, вызываем методprocessElement2 ,relativeLowerBoundдля-5Прямо сейчасrelativeLowerBound<0,в это время timerTimestamp=10s。
    //Очистить временную метку правого потока в этот момент=timerTimestamp + lowerBound =10с-5=5с, что на самом деле очищается, так это данные с правильным потоком 5с? ? ?
    //Когда время достигнет 10 секунд,Вы можете очистить данные с отметкой времени 10 секунд в правом потоке.,То есть посмотрите на поток слева на 10с.,Необходимо найти правильный временной диапазон трансляции.11s<=Временная метка справа<=16s,тактакwatermark>10sМожно очистить, когда右边流时间戳для10sданные。
    long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);
    break;
   }
   default:
    throw new RuntimeException("Invalid namespace " + namespace);
  }

 }

}

2.3 Подробное объяснение механизма очистки статуса

2.3.1 Время очистки статуса cleanupTime
Язык кода:javascript
копировать
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
2.3.2 Выполнение операции очистки состояния Buffer.remove(timestamp)
  1. Предположение: предположим, что временная метка данных, поступающих в текущий поток, равна 10 с, а время, прошедшее между ними, составляет 1 с и 5 с соответственно. LowerBound — 1 с, UpperBound — 5 с,
  2. значение: Левый поток может быть присоединен к правому потоку. Временной диапазон: Данные [левый поток + 1, левый поток + 5], то есть Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s Правый поток может быть присоединен к левому потоку. Временной диапазон: Данные [правого потока-5, правого потока-1], то есть Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s
  3. Когда поступают данные с временной меткой левого потока, равной 10 с,

(1)Временная метка левого потока+1s<=Временная метка справа<=Временная метка левого потока+5s ; нижняя граница — 1 с, верхняя граница — 5 с.

(2) Если это левый поток, вызовите методprocessElement1, relativeUpperBound равен 5, то есть relativeUpperBound>0, в это время timerTimestamp=10+5=15s。

Язык кода:javascript
копировать
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

(3) В это время relativelowerBoundдля1Прямо сейчасlowerBound>0; Очистите временную метку левого потока=timerTimestamp=15s.

Язык кода:javascript
копировать
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4)Вывод:

Когда время достигнет 15 секунд,Вы можете очистить данные левого потока с отметкой времени 10 секунд.,То есть посмотрите на поток справа на 15с.,Нужно найтииз Временной диапазон левого потока10s<=Временная метка левого потока<=14s,такwatermark>15sМожно очистить, когда Временная метка левого потокадля10sданные。

  1. Когда поступают данные с правильной временной меткой потока в 10 секунд.

(1)Временная метка справа-5s<=Временная метка левого потока<=Временная метка справа-1s;в это времяrelativeLowerBoundдля-5,относительныйUpperBoundдля-1。

(2) Если поступают правильные данные потока, вызовите методprocessElement2. ,relativeLowerBoundдля-5Прямо сейчасrelativeLowerBound<0,в это время timerTimestamp=10s。

Язык кода:javascript
копировать
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
1

(3) В это время relativelowerBoundдля-5Прямо сейчасlowerBound<0;чистый правый потокизtimestamp=timerTimestamp + lowerBound =10с-5=5с, что на самом деле очищается, так это данные с правильным потоком 5с? ? ?

Язык кода:javascript
копировать
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4) Заключение

Когда время достигнет 10 секунд,Вы можете очистить данные с отметкой времени 10 секунд в правом потоке.,То есть посмотрите на поток слева на 10с.,Необходимо найти правильный временной диапазон трансляции.11s<=Временная метка справа<=16s,тактакwatermark>10sМожно очистить, когда右边流时间戳для10sданные。

2.4 Что нужно знать после прочтения исходного кода

2.4.1 Состояние хранилища MapState

Штат использует MapState, принадлежащий Keyed Тип государства. Состояние можно понимать как локальный кеш, который используется для хранения данных двух потоков соответственно. Его структура данных MapState<Long, List>,Где Long соответствует временной метке данных.,Список соответствует данным с той же меткой времени (где BufferEntry имеет элемент и hasBeenJoined два свойства)

2.4.2 Время очистки статуса

Время очистки состояния левого потока = OurTimestamp (событие в потоке данных) + относительныйUpperBound (верхняя граница временного диапазона)

Время очистки состояния правого потока = OurTimestamp (Событие в потоке данных)

3. Различия и сценарии использования трёх типов Join

Ссылки

Язык кода:javascript
копировать
Flink DataStream Join && IntervalJoin && Разница между когруппами
https://blog.csdn.net/qq_33689414/article/details/93875881

(Версия Flink в реальном времени Alibaba Cloud) Оператор IntervalJoin
https://help.aliyun.com/document_detail/195298.html


(Принцип) Apache Flink Серия случайных разговоров - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache%20Flink%20%E6%BC%AB%E8%B0%88%E7%B3%BB%E5%88%97%20-%20Time%20Interval%20JOIN/#more


(Механизм очистки состояния) Flink1.11 intervalJoin создание водяных знаков,Понимание исходного кода механизма очистки состояния&Demoанализировать
https://blog.csdn.net/qq_34864753/article/details/111183556

(Анализ исходного кода) Flink Interval Join Анализ использования и принципов
 https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242

4. Присоединяйтесь к нескольким потокам

Упомянутое выше СОЕДИНЕНИЕ двух потоков, но реальный сценарий может включать СОЕДИНЕНИЕ четырех или даже шести потоков. Как это реализовать?

4.1 Сценарий 1: Несколько потоков обновляются нечасто, и их необходимо объединить в одну таблицу в режиме реального времени (несколько таблиц измерений ОБЪЕДИНЯЮТСЯ в одну таблицу измерений).

Этапы реализации:

1. Используйте Canal для синхронизации binlog MySQL с Kafka в реальном времени для формирования соответствующего потока.

2. Асинхронно соедините таблицу A с другими таблицами (таблицами B, C и D) в исходном MySQL. Внесите соответствующие дополнения, исключения и изменения в таблицу Е.

Обратите внимание на предварительные условия здесь:

(1) Четыре таблицы MySQL обновляются нечасто, поскольку, если они обновляются часто, использование MySQL для асинхронного соединения может не соответствовать требованиям QPS.

(2) Таблица A выбирается в зависимости от обстоятельств для объединения таблиц B, C и D. Необходимо объединять только таблицы с добавлениями, удалениями и изменениями цели E.

4.2 Соединение двух потоков (JOIN таблицы фактов и таблицы измерений)

Этапы реализации:

1. Используйте Canal для синхронизации бинлога MySQL с Kafka в реальном времени и потока таблицы фактов.

2. Используйте Canal для синхронизации таблицы измерений B MySQ с Phoenix в реальном времени.

3. Используйте поток A в Kafka для асинхронного соединения таблицы измерений B в Phoenix и записывайте результаты в таблицу результатов C в Phoenix.

Уведомление:

(1) Здесь таблица измерений B синхронизируется с Phoenix в режиме реального времени, поскольку количество запросов в секунду таблицы измерений B относительно велико (таблица измерений здесь представляет собой широкую концепцию. Если количество запросов в секунду относительно низкое, вы можете напрямую использовать таблицу измерений B). в MySQL.

4.3 Объединение двух таблиц фактов (без использования TimeWindowJoin)

Если TimeWindowJoin используется для объединения двух таблиц реального времени, статус данных будет сохранен в рабочем состоянии Flink. Во-первых, здесь используется стороннее хранилище Phoenix. Во-вторых, недостатком IntervalJoin является то, что если один из потоков задерживается и задержка превышает время истечения State, произойдет потеря данных. Для решения этой проблемы здесь используется вывод CoGroupJoin+sideflow.

Этапы реализации:

1. Используйте Canal для синхронизации бинлога MySQL с Kafka в реальном времени, потока таблицы фактов A и потока B.

2. Используйте поток A для потока группы B.

3. Опоздание потока, SideputTag+API/DB (используйте API для асинхронного соединения данных таблицы B из базы данных). Таким же образом, поток B задерживается, SideputTag+API/DB (используйте API для асинхронного соединения данных таблицы A из базы данных).

4.СОЮЗ. ОБЪЕДИНЯЙТЕ все потоки и записывайте в таблицу Phoenix C.

Уведомление:

Разница здесь и IntervalJoin заключается в,Не использовать Flink,Вместо этого задержанные данные выводятся напрямую через SideOutPutTag.,И асинхронное соединение данных в MySQL.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose