Вычисления с отслеживанием состояния — важная функция, которую должна реализовать платформа потоковой обработки, поскольку в несколько более сложных сценариях потоковой обработки необходимо записывать состояние, а затем постоянно обновлять его на основе новых входящих данных. Следующие сценарии требуют использования функции состояния потоковой обработки:
Оператор в Flink имеет несколько подзадач, и каждая подзадача распределена по разным экземплярам. Мы можем понимать состояние как переменную определенной подзадачи оператора в ее текущем экземпляре. Переменная записывает историческую информацию о потоке данных. По мере поступления новых данных мы можем комбинировать их с исторической информацией для выполнения расчетов.
Flink имеет два основных типа состояния: управляемое состояние и необработанное состояние. Разницу между ними также можно прочитать из названия: Managed State управляется Flink, а Flink помогает с хранением, восстановлением и оптимизацией. Необработанное состояние управляется самим разработчиком и должно быть сериализовано им самим.
- | Managed State | Raw State |
---|---|---|
Управление статусами Способ | Flink Работающий хостинг, автоматическое хранение, автоматическое восстановление, автоматическое масштабирование. | Пользователи управляют сами |
структура данных статуса | Общие структуры данных, предоставляемые Flink, такие как ValueState, ListState, MapState и т. д. | Необработанное состояние поддерживает только байты, и любую структуру данных верхнего уровня необходимо сериализовать в массив байтов. |
Сценарии использования | Большинство операторов | Пользовательский оператор |
Управляемое состояние продолжает подразделяться и имеет два типа: состояние с ключом и состояние оператора.
Flink поддерживает экземпляр состояния для каждого значения ключа и разделяет все данные с одним и тем же ключом на одну и ту же задачу оператора. Эта задача будет поддерживать и обрабатывать состояние, соответствующее этому ключу. Когда задача обрабатывает фрагмент данных, она автоматически ограничивает область доступа состояния ключом текущих данных. Таким образом, все данные с одним и тем же ключом получают доступ к одному и тому же состоянию. Keyed State очень похоже на распределенную структуру данных карты значений ключа, которую можно использовать только в KeyedStream (после обработки оператором keyBy).
Существует пять типов ключевого состояния:
KeyedState находится в стадии разработки KeyBy Тип состояния, используемый при выполнении операций с состоянием позже, например Операторы Source и Sink не будут выполняться. KeyBy Операция: когда этому типу оператора также необходимо использовать состояние, как с ним работать? В это время вам нужно использовать Operator State(Статус оператора)Operator State обязан Operator Степень параллелизма в экземпляре, то есть одна степень параллелизма и одно состояние.
Например, когда степень параллелизма источника Kafka, который использует данные Kafka, равна 3, по умолчанию каждая степень параллелизма использует данные из определенного раздела темы Kafka. Чтобы гарантировать, что в крайних случаях данные не будут потеряны, каждый Kafka. Источник Вы не можете сохранить смещение, соответствующее разделу, в Zookeeper по умолчанию. Вместо этого вам необходимо сохранить эти данные в состоянии и поддерживать эту часть данных самостоятельно. Когда параллелизм настроен, состояние необходимо перераспределить на параллелизме Оператора.
В большинстве сценариев разработки потоковой передачи данных нам не нужно использовать состояние оператора. Реализация состояния оператора в основном предназначена для источников и приемников, которые не имеют операций с ключами.
Объем состояния оператора ограничен задачами оператора. Это означает, что все данные, обрабатываемые одной и той же параллельной задачей, могут иметь доступ к одному и тому же состоянию, и это состояние является общим для одной и той же задачи. К состоянию оператора не может получить доступ другая задача того же или другого оператора.
Flink предоставляет три основные структуры данных для статуса оператора:
List
,независимы друг от друга,Облегчает Статус после изменения параллелизма. из Переназначение. Эти объекты перераспределяются non-Keyed State тончайшая зернистость. В соответствии с различными методами доступа состояния существуют следующие два режима перераспределения:
element1
и element2
,Когда количество одновременных чтений увеличивается до 2,element1
будет назначено одновременному 0 начальство,element2
нобудет назначено одновременному 1 начальство.
getUnionListState(descriptor)
буду использовать union redistribution алгоритм, и getListState(descriptor)
Он прост в использовании even-split redistribution алгоритм.
isRestored()
Метод определяет, следует ли восстанавливаться после предыдущего сбоя, если метод возвращает значение. true
Это означает восстановление после неисправности, и будет выполнена следующая логика восстановления.
Имя конфигурации | значение по умолчанию | иллюстрировать |
---|---|---|
state.backend | - | Рекомендуется настроить его как RocksDB. |
state.backend.latency-track.keyed-state-enabled | false | Отслеживать ли задержку операций с ключевыми состояниями. Рекомендуется не включать. |
state.backend.latency-track.sample-interval | 100 | Отслеживайте операции, которые занимают более 100 мс. |
state.backend.latency-track.history-size | 128 | Отслеживайте количество операций, которые занимают много времени |
table.exec.state.ttl | - | Время ttl серверной части состояния обычно используется в сценариях присоединения, чтобы предотвратить слишком большой размер серверной части состояния, приводящий к сбою задания. |
Имя конфигурации | значение по умолчанию | иллюстрировать |
---|---|---|
execution.checkpointing.interval | - | Время срабатывания контрольной точки, контрольная точка будет срабатывать каждый период времени. Рекомендуется, чтобы общая конфигурация составляла около 1-10 минут. |
execution.checkpointing.mode | EXACTLY_ONCE | EXACTLY_ONCE: гарантированно будет точным один раз; AT_LEAST_ONCE: хотя бы один раз. Предложить EXACTLY_ONCE |
state.backend.incremental | false | Включить ли инкрементную контрольную точку, рекомендуется включить ее |
execution.checkpointing.timeout | 10min | Рекомендуется установить таймаут контрольной точки больше, около 30 минут. |
execution.checkpointing.unaligned.enabled | false | Включать ли невыровненную контрольную точку, рекомендуется не включать |
execution.checkpointing.unaligned.forced | false | Следует ли принудительно включать неприсоединенные контрольные точки |
execution.checkpointing.max-concurrent-checkpoints | 1 | Максимальное количество одновременных КПП |
execution.checkpointing.min-pause | 0 | Минимальное время паузы между двумя контрольными точками |
execution.checkpointing.tolerable-failed-checkpoints | - | Допустимое количество последовательных сбоев контрольных точек. |
execution.checkpointing.aligned-checkpoint-timeout | 0 | Тайм-аут контрольной точки выравнивания |
execution.checkpointing.alignment-timeout | 0 | Ссылка: выполнение.checkpointing.aligned-checkpoint-timeout (истек) |
execution.checkpointing.force | false | Форсировать ли контрольную точку (срок действия истек) |
state.checkpoints.num-retained | 1 | Количество сохраненных контрольных точек |
state.backend.async | true | Включить ли асинхронную контрольную точку (срок действия истек) |
state.savepoints.dir | - | папка для хранения точек сохранения |
state.checkpoints.dir | - | папка для хранения контрольных точек |
state.storage.fs.memory-threshold | 20kb | Минимальный размер файла состояния |
state.storage.fs.write-buffer-size | 4 * 1024 | Размер буфера записи по умолчанию для потоков контрольных точек, записываемых в файловую систему. |
Имя элемента конфигурации | значение по умолчанию | иллюстрировать |
---|---|---|
state.backend.rocksdb.checkpoint.transfer.thread.num | 4 | Количество потоков, используемых для загрузки и скачивания файлов |
state.backend.rocksdb.write-batch-size | 2mb | Максимальный объем памяти, потребляемый Rocksdb при записи |
state.backend.rocksdb.predefined-options | DEFAULT | DEFAULT:всеизRocksDbКонфигурация Всезначение по умолчанию。 SPINNING_DISK_OPTIMIZED: оптимизировать параметры RocksDb при записи на жесткий диск. SPINNING_DISK_OPTIMIZED_HIGH_MEM: Оптимизация параметров при записи на обычный жесткий диск требует потребления большего объема памяти FLASH_SSD_OPTIMIZED: оптимизация при записи на флэш-диск SSD. |
Диаграмма классов реализации StateBackend,В версии 1.17,Срок действия части статуса задняя часть истек,например:MemoryStateBackend、RocksDBStateBackend、FsStateBackendждать。
После удаления состояния с истекшим сроком действия оставшийся бэкэнд выглядит следующим образом:
Сохраните серверную информацию о состоянии задания в памяти TaskManager. Если TaskManager выполняет несколько задач параллельно, вся совокупная информация должна быть сохранена в текущей памяти TaskManager. Данные в основном хранятся в куче памяти в виде объектов Java. Форма ключ/значение операторов состояния и окон будет содержать хеш-таблицу, в которой хранятся значения состояния и триггеры.
Формат хранения в памяти определяется следующим образом:
/** So that we can give out state when the user uses the same key. */
private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
Рекомендуется также managed memory Установите значение 0, чтобы обеспечить выделение максимального объема памяти для JVM код пользователя включен.
Сохраните статус текущих заданий в RocksDb.
RocksDB JNI library
СвязанныйJarСумка。try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MemoryAllocationException("Interrupted while acquiring memory");
}
Прежде чем подавать заявку на ресурсы, вам необходимо определить, была ли заявка на ресурс по типу. Если на ресурс уже была подана заявка, она не будет применена повторно. Если нет, вам необходимо подать заявку. Код приложения следующий:
private static <T extends AutoCloseable> LeasedResource<T> createResource(
LongFunctionWithException<T, Exception> initializer, long size) throws Exception {
final T resource = initializer.apply(size);
return new LeasedResource<>(resource, size);
}
private RocksDBResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
@Nullable File instanceBasePath,
boolean enableStatistics) {
return new RocksDBResourceContainer(
configurableOptions != null ? configurableOptions : new Configuration(),
predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
rocksDbOptionsFactory,
sharedResources,
instanceBasePath,
enableStatistics);
restoreOperation =
getRocksDBRestoreOperation(
keyGroupPrefixBytes,
cancelStreamRegistry,
kvStateInformation,
registeredPQStates,
ttlCompactFiltersManager);
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
Ниже показана диаграмма классов реализации restoreOperation, которая в основном включает следующие классы реализации.
В основном реализует восстановление данных RocksDB из инкрементных снимков. Основная функция — восстановление(). Основные различия:
if (isRescaling) {
restoreWithRescaling(restoreStateHandles);
} else {
restoreWithoutRescaling(theFirstStateHandle);
}