Состояние используется для записи промежуточных результатов вычислений или метаданных операторов во время работы приложений Flink. Если работающему приложению Flink для обработки требуется последний результат расчета, ему необходимо использовать состояние для хранения промежуточного результата расчета. Например, сценарии объединения и агрегирования окон.
Когда приложение Flink запущено, информация о состоянии будет сохранена в экземпляре объекта State. Экземпляр объекта State сохраняет соответствующие данные в файловой системе FS или базе данных RocksDB через StateBackend. Пока приложение Flink работает, данные о состоянии периодически сохраняются с помощью снимков контрольных точек. А когда приложение Flink перезапускается, загружается контрольная точка/точка сохранения для восстановления состояния, чтобы приложение Flink могло продолжить выполнение предыдущего расчета данных и обеспечить точную передачу данных в нисходящий поток.
Разделены на следующие 3 категории:
HeapStateBackend и RocksDBStateBackend соответственно соответствуют своим позициям в модели памяти TaskManager:
Структура хранения в RocksDBStateBackend:
пространство имен: в разных пространствах имен существуют состояния с одинаковым именем.
проходить Chandy-Lamport алгоритм распределенных снимков checkpoint Завершите сохранение данных состояния. затем в Flink Читать, когда приложение перезапускается State Данные о состоянии можно восстановить на месте эксплуатации.
Категория КПП:
Состояние можно разделить на две категории: состояние оператора и состояние с ключом.
Часто существует в Source, Sink. Пример конкретного класса реализации: BroadcastState
Пример: ОператорState используется для записи смещения в Kafka Source.
Любой тип ключевого состояния может иметь период действия (TTL), и все типы состояний поддерживают одноэлементный TTL. Это означает, что срок действия элементов List и Map истекает независимо.
Пример: данные в окне после SQL GroupBy/PartitionBy, каждый ключ имеет соответствующее состояние. Данные о состоянии между ключом и ключом не видны.
Конкретный класс реализации ключевого состояния:
Интеллект-карта штата Флинк:
Keyed State | Operator State | |
---|---|---|
Применимые типы операторов | Применяется только к операторам KeyedStream. | Доступно для всех операторов |
присвоение статуса | Каждый ключ соответствует состоянию | Подзадача оператора соответствует состоянию |
Масштабировать | Статус автоматически переносится на несколько подзадач оператора с помощью группировки ключей KeyGroup. | Существует несколько способов переназначения состояния. |
Как создать и получить доступ | Пользовательский оператор(переписатьRichFunction,проходитьState Имя из Метод getRuntimeContext создает или получает State ) | Реализуйте такие интерфейсы, как CheckpointedFunction. |
Поддержка структур данных | ValueState, ListState, MapStateждать | ListState, BroadcastState и т. д. |
class FlinkKafkaConsumerBase {
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; // имя состояния: "topic-partition-offset-states"
// Особый тип государства: Союз State
}
Переменная UnionOffsetStates имеет тип OperationState.
• ValueState/MapState/ListState/......
думать:keyby Распределение постданных и несколько степеней параллелизма subtask Какова связь между ними?
Во-первых, после того, как данные в потоке данных пройдут через Keyby, они будут разделены на различные KeyedStreams. Каждый KeyedStream имеет свой собственный KeyedState (например, ValueState/ListState/MapState).
Во-вторых, данные в KeyedStream будут организованы в KeyGroup. KeyGroup — это наименьшая единица Flink для перераспределения состояния ключей.
Наконец, группа ключей серединаиз Данные будут распределяться по каждому модулю с использованием максимальной степени параллелизма по модулю. subtask середина. Ниже приведен ключевой исходный код:
KeyGroupStreamPartitioner#selectChannel(record)
{
K key;
key = keySelector.getKey(record.getInstance().getValue());
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfChannels);
}
--KeyGroupRangeAssignment#assignKeyToParallelOperator()
{
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
--KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup()
Официально: Оператор Индекс = keyGroupId * parallelism / maxParallelism
--KeyGroupRangeAssignment#assignToKeyGroup()
{
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
Распределенный снимок Checkpoint концепция, регулярно State упорствовать в Внешняя система хранения (HDFS/OSS) начальство. Пользователи могут провестивыполнение CheckpointedFunction интерфейс для использования operator state。проходить barrier выравнивать КПП, подожди State Персистентность завершена (этот процесс также может быть асинхронным в зависимости от параметров).
общий State и CP Связанные вопросы:
Распространенные решения: увеличьте размер управляемой памяти.