Топология операторов задания Flink описывается бегущим графом, состоящим из ряда операторов, как показано на следующем рисунке:
Каждый узел работающего графа имеет свой идентификатор, а также может иметь собственное состояние. Когда Flink делает снимок, он сохраняет соответствующую связь между идентификатором оператора и состоянием. Следовательно, когда мы восстанавливаем задание из снимка, если каждый идентификатор оператора соответствует предыдущему оператору взаимно однозначно, мы можем точно восстановить рабочий статус предыдущего снимка.
Если пользователь явно не указывает оператор ID,Flink будет использовать топологию,Автоматически генерировать свой собственный идентификатор。
Когда мы пишем задания Flink через SQL или API таблиц, пользовательской стороне сложно напрямую участвовать в логике генерации оператора бюджета, поскольку для получения окончательного оператора Flink его необходимо транслировать и оптимизировать с помощью Calcite.
Например, если пользователь слегка модифицирует код SQL или обновляет версию Flink, это может привести к изменению рабочего графика, и автоматически сгенерированный идентификатор оператора больше не будет соответствовать предыдущему, что приведет к невозможности восстановления моментального снимка.
Тогда возникает вопрос:как можнозафиксированныйоператориз ID,То есть, какие бы последующие модификации ни вносились,,Пока этот оператор остается самим собой,тогда это из ID Неужели это никогда не изменится?
существовать DataStream API В режиме программирования Flink предоставляет фиксированные операторы ID путь: мы можем пройти uid() Метод явно устанавливает строку для оператора идентификатор, за которым следует Flink поставлю это uid руководить hash обработка и, наконец, сопоставление с уникальным оператором ID。
Например, мы можемсуществовать Flink Следующий пример находится в тестовом коде:
env.addSource(new StatefulSource(false, finalCheckpointLatch))
.uid(SOURCE_UID)
.setParallelism(NUM_SOURCES)
.sinkTo(sink)
.setParallelism(NUM_SINKS)
.uid(SINK_UID);
существоватьэтот В примере,Пользователь для Source и Sink Оператор явно объявлен жидкость. Таким образом, независимо от того, сколько других операторов будет добавлено в середине, это не повлияет. Source и Sink статус совпадает.
этот uid() Нижний уровень метода — вызов Transformation#setUid() метод установки uid , поэтому прорыв здесь: как найти Flink SQL созданный Объект трансформации,идляэтонастраиватьтолькоиз uid。
Мы уже можем узнать из многих статей Flink из SQL код преобразован в Transformation изшаг,НапримерЭта статья,Поэтому в этой статье не буду повторять подробности.
Короче говоря, Флинк из SQL домашнее задание, которое нужно выполнить SQL код → SqlNode AST синтаксическое дерево → Operation слой абстракции → RelNode логическое дерево → RelNode физическое дерево → ExecNodeGraph Схема выполнения → Transformations → StreamGraph → JobGraph → ExecutionGraph наконец-то может быть отправлен на выполнение.
Поскольку мы знаем, что пока мы даем Transformation настраивать uid Вы можете гарантировать последующее сопровождение со стороны оператора ID фиксированный, вы можете думать об этом наоборот: до тех пор, пока он существует из предшественника ExecNode Сохранить в uid,Таксуществовать ExecNodeBase#translateToPlanInternal В этом методе мы можем сохранить согласно этому uid Приходитьнастраивать Transformation из uid。
Например, создадим новый StreamExecCalc из подкатегории, названной для EnhancedStreamExecCalc, переопределить translateToPlanInternal метод:
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
Transformation<RowData> transformation = super.translateToPlanInternal(planner);
transformation.setUid(uid);
return transformation;
}
Это расслабляет Transformation проведет нас до конца uid Передайте это дальше.
Следуй идее этого,Продолжим обратный путь,существовать ExecNode из предшественника, то есть StreamPhysicalRel в, тоже присоединяйся uid поле. Вот так это созданный ExecNode Вы можете взять его с собой uid информация.
Например, мы можем продолжить создание нового StreamPhysicalCalc из подкласса, переопределить translateToExecNode Метод, существующий здесь, только что упомянутый там uid из EnhancedStreamExecCalc объект.
Позже мы добавили Planner Правило: напишите правила сопоставления на основе характеристик оператора (таких как имя, параметры) и RelNode Замените его на нашу расширенную версию.
Подведенный итог: Этот метод необходимо настроить для каждого типа,Сложнее, чем для,Подходит для сценариев с более специализированной логикой.
Из-за недостаточной универсальности упомянутого выше «одноточечного» метода улучшения Flink Сообществосуществовать FLIP-190: Support Version Upgrades for Table API & SQL Programs В предложении по таким причинам ID Существует систематическое решение проблем несовместимости, вызванных изменениями (оно еще не доработано).
Основные технические моменты этого предложения по-прежнему формируются в соответствии с определенными правилами. Transformation из uid, таким образом гарантируя, что оператор графа будет запущен ID иззафиксированный. Здесь добавлен новый TransformationMetadata Класс существования, только что упомянутый, из translateToPlanInternal Метод записывает определенное имя оператора, uid, запись и другие метаданные.
Основная идея – увеличить COMPILE PLAN высказывание, поставьте данное из SQL Логика запроса становится JSON описывать Plan документация (см. Образец файла),впоследствиипользовательможет пройти EXECUTE PLAN заявление, исполнениеэтот JSON Формат из Plan документ. если бы только Plan Формат файла совместим с из,оператор ID из Генерация правил фиксации может обеспечить логику конечной схемы работы оператора ID из Стабильности.
Пользователям нужно только существование Flink в параметрахнастраивать table.exec.uid.generation для PLAN_ONLY(значение по умолчанию),Эту функцию можно включить。для всех COMPILE PLAN В заявлении содержится логика, Флинк Будет для каждого оператора согласно правилам (по table.exec.uid.format Контроль параметров) генерировать уникальные из ID。
Эта статья объясняет Flink оператор ID из использования, логики генерации и последствий несоответствия, и проанализировано, как явно указать SQL заявлениесозданный Различные структурынастраиватьзафиксированныйиз uid, позже также представленный Flink Мысли сообщества о том, как справиться с этой проблемой. Я надеюсь, что они вдохновят всех.