Принцип и анализ резервного копирования снимков Elasticsearch
Принцип и анализ резервного копирования снимков Elasticsearch

Что такое резервное копирование снимков Elasticsearch

Снимок — это функция, предоставляемая Elasticsearch для резервного копирования данных кластера в удаленный репозиторий. Например, выполните резервное копирование данных в S3, HDFS, общую файловую систему и т. д.

Сценарии использования

1. Резервное копирование и восстановление данных

• Функция моментального снимка обеспечивает возможности резервного копирования и восстановления данных, гарантируя, что данные не будут потеряны из-за непредвиденных сбоев.

2. Кластерная миграция

• Вы можете использовать функцию моментального снимка для перемещения данных из Elasticsearch Кластерная миграция на другой кластер.

3. Архивирование данных

• Сохраняйте исторические данные в хранилище, освобождая место для хранения в вашем онлайн-кластере.

Repository

  • Репозиторий — это место хранения, где сохраняются снимки. Elasticsearch поддерживает множество типов репозиториев, таких как общие файловые системы (fs), Amazon S3, HDFS, хранилище BLOB-объектов Azure и т. д.
  • Репозиторий должен находиться в Create. Зарегистрируйтесь перед тем, как сделать.

Snapshot

  • Снимок — это резервная копия кластера или индекса Elasticsearch в указанный момент времени. Снимок содержит данные для всех сегментов индекса.
  • Снимки являются инкрементными, и сохраняются только те данные, которые изменились с момента последнего снимка.

Этапы работы со снимком

Создать склад

Язык кода:json
копировать
PUT /_snapshot/my_backup
{
  "type": "fs",
  "settings": {
    "location": "/mount/backups",
    "compress": true
  }
}

Создать снимок

Язык кода:json
копировать
PUT /_snapshot/my_backup/snapshot_1
{
  "indices": "index_1,index_2",
  "ignore_unavailable": true,
  "include_global_state": false
}

Описание параметра:

• индексы: список индексов для резервного копирования.

• ignore_unavailable: игнорировать ли индекс, если он недоступен.

• include_global_state: включать ли глобальное состояние кластера.

Восстановить снимок

Язык кода:json
копировать
POST /_snapshot/my_backup/snapshot_1/_restore
{
  "indices": "index_1,index_2",
  "ignore_unavailable": true,
  "include_global_state": false,
  "rename_pattern": "index_(.+)",
  "rename_replacement": "restored_index_$1"
}

Описание параметра:

• индексы: список индексов, подлежащих восстановлению.

• ignore_unavailable: игнорировать ли индекс, если он недоступен.

• include_global_state: включать ли глобальное состояние кластера.

• rename_pattern и rename_replacement: индексы, используемые для восстановления переименования.

Посмотреть снимок

Язык кода:bash
копировать
#Просмотреть все снимки под складом
GET /_snapshot/my_backup/_all
#Просмотр конкретного снимка
GET /_snapshot/my_backup/snapshot1
#Посмотреть снимоксостояние
GET /_snapshot/my_backup/snapshot1/_status

Удалить снимок

Язык кода:txt
копировать
DELETE /_snapshot/my_backup/snapshot_1
  • Вы можете удалить моментальный снимок, чтобы освободить место на носителе, на котором расположено удаленное хранилище.

Меры предосторожности при эксплуатации

1. Тип и конфигурация репозитория

• Тип и конфигурация репозиторий следует выбирать на основе реальных потребностей и среды. Например, использование репозитория S3 требует настройки учетных данных для доступа и сегментов хранилища.

2. Снимок производительности

• Создать и Восстановить Ресурсы кластера будут потребляться при съемке, и эти операции следует выполнять в периоды низкой нагрузки, чтобы избежать влияния на онлайн-сервисы.

3. Согласованность снимка

• Создать снимок Elasticsearch обеспечивает согласованность данных. Даже если в процессе создания снимка выполняются операции записи данных, это не повлияет на согласованность снимка.

4. безопасность

• Убедитесь, что разрешения на доступ к репозиторию и разрешения на операции с моментальными снимками настроены правильно для предотвращения несанкционированных операций.

Принцип резервного копирования и анализ исходного кода

Принцип резервного копирования

С момента инициирования запроса на создание моментального снимка до завершения резервного копирования моментального снимка он условно делится на следующие этапы:

Фаза анализа запроса

  1. Получить запрос на резервное копирование моментального снимка,Анализ операторов резервного копирования моментальных снимков,Создать запрос резервного копирования снимков.
  2. После создания запроса на резервное копирование Elasticsearch сначала выполнит проверку соответствия и статус репозитория.
  3. Разобрать выражение индексов в запросе, загрузить различные параметры, указанные в запросе, и загрузить статус плагина;

Запросить этап строительства

  1. Создайте запрос моментального снимка для анализа запроса на резервное копирование, отправленного через клиент, в оператор запроса для выполнения в Elasticsearch.
  2. Инициализируйте запрос на резервное копирование, в основном генерируя идентификатор моментального снимка, регистрируя прослушиватель, загружая метаданные репозитория и т. д.

этап подготовки запроса

  1. Сначала проверьте снимок в запросе на снимок, чтобы определить, существует ли он уже или в процессе уже есть снимок с таким же именем.
  2. Перед запуском задачи резервного копирования моментального снимка необходимо проверить метаданные репозитория, чтобы определить доступность имени моментального снимка в репозитории. Проверьте, превышает ли количество одновременных задач предел значения параметра.
  3. Получите список индексов, участвующих в снимке, и подготовьте соответствующие метаданные и данные сегментов.
  4. Создать коллекция записей снимков, начните поддерживать статус снимков.

этап выполнения запроса

  1. Получите идентификатор сегмента индекса, который был проверен.
  2. Проверьте, является ли полученный индексный сегмент основным. Это связано с тем, что снимки применяются только к основному сегменту.
  3. Проверьте текущий статус сегмента индекса, чтобы узнать, разрешены ли операции со снимками. Когда сегмент, резервную копию которого необходимо создать, находится в состоянии перемещения или восстановления, во время операции резервного копирования могут возникнуть конфликты.
  4. Получите базовый файл сегмента и файл метаданных сегмента индексов, включенных в снимок. Записывайте файлы данных в удаленный репозиторий в потоковом режиме.
  5. Во время выполнения задачи резервного копирования моментальных снимков ActionListener будет постоянно отслеживать состояние запроса на резервное копирование.

Стадия завершения запроса

  1. Когда резервное копирование сегмента индекса будет завершено, процесс моментального снимка вызовет метод, подобный главному узлу в кластере, для синхронизации информации о завершенном сегменте и отправки статуса завершения. При этом статус резервного копирования шарда узла, на котором расположен шард, синхронизируется с мастер-узлом.
  2. Обновите статус резервного копирования определенного моментального снимка на узле, на котором расположен сегмент.
  3. Получите информацию обратного вызова, возвращаемую ActionListener, и обработайте ее.
  4. Когда все сегменты в моментальном снимке будут скопированы и все обновления статуса завершены, процесс резервного копирования моментального снимка освободит занятые ресурсы сегментов. Пул потоков TransportService также завершит работу по обслуживанию потоков, связанных со снимками.

Детали логики выполнения задачи резервного копирования моментальных снимков в Elasticsearch показаны на рисунке ниже:

SnapshotПринцип резервного копированияблок-схема
SnapshotПринцип резервного копированияблок-схема

Основной анализ исходного кода

1.CreateSnapshotRequest.java

В основном используется для создания снимокпросить,сдержанныйrepositoryуникальный параметр,и соглашения об именах.

Язык кода:java
копировать
     public CreateSnapshotRequest(String repository, String snapshot) {
        this.snapshot = snapshot;
        this.repository = repository;
    }

    public CreateSnapshotRequest(StreamInput in) throws IOException {
        super(in);
        snapshot = in.readString();
        repository = in.readString();
        indices = in.readStringArray();
        indicesOptions = IndicesOptions.readIndicesOptions(in);
        if (in.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) {
            readSettingsFromStream(in);
        }
        featureStates = in.readStringArray();
        includeGlobalState = in.readBoolean();
        waitForCompletion = in.readBoolean();
        partial = in.readBoolean();
        userMetadata = in.readMap();
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeString(snapshot);
        out.writeString(repository);
        out.writeStringArray(indices);
        indicesOptions.writeIndicesOptions(out);
        if (out.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) {
            Settings.EMPTY.writeTo(out);
        }
        out.writeStringArray(featureStates);
        out.writeBoolean(includeGlobalState);
        out.writeBoolean(waitForCompletion);
        out.writeBoolean(partial);
        out.writeGenericMap(userMetadata);
    }

    @Override
    public ActionRequestValidationException validate() {
        ActionRequestValidationException validationException = null;
        if (snapshot == null) {
            validationException = addValidationError("snapshot is missing", validationException);
        }
        if (repository == null) {
            validationException = addValidationError("repository is missing", validationException);
        }
        if (indices == null) {
            validationException = addValidationError("indices is null", validationException);
        } else {
            for (String index : indices) {
                if (index == null) {
                    validationException = addValidationError("index is null", validationException);
                    break;
                }
            }
        }
        if (indicesOptions == null) {
            validationException = addValidationError("indicesOptions is null", validationException);
        }
        if (featureStates == null) {
            validationException = addValidationError("featureStates is null", validationException);
        }
        final int metadataSize = metadataSize(userMetadata);
        if (metadataSize > MAXIMUM_METADATA_BYTES) {
            validationException = addValidationError(
                "metadata must be smaller than 1024 bytes, but was [" + metadataSize + "]",
                validationException
            );
        }
        return validationException;
    }
    
    @Override
    public String toString() {
        return "CreateSnapshotRequest{"
            + "snapshot='"
            + snapshot
            + '\''
            + ", repository='"
            + repository
            + '\''
            + ", indices="
            + (indices == null ? null : Arrays.asList(indices))
            + ", indicesOptions="
            + indicesOptions
            + ", featureStates="
            + Arrays.asList(featureStates)
            + ", partial="
            + partial
            + ", includeGlobalState="
            + includeGlobalState
            + ", waitForCompletion="
            + waitForCompletion
            + ", masterNodeTimeout="
            + masterNodeTimeout
            + ", metadata="
            + userMetadata
            + '}';
    }

CreateSnapshotRequestКлассы наследуются отMasterNodeRequestдобрый,В то же время при наследованииCreateSnapshotRequestПередается как общий параметр。имя таблицы Создать Запросы снимков должны обрабатываться главным узлом. Вызов родительского класса MasterNodeRequest Метод строительства,Чтение из входного потокаиинициализировать родительский элементдобрыйполя。проходитьпозвонить родителюдобрыйизwriteTo()метод Снимокпроситьв Поля записываются в выходной поток。в то же времяпроходитьvalidate()Правильный путьsnapshot,repository,indices,indicesOptions,featureStates,metadataSizeПровести соответствующую проверку。проходитьtoString()метод завершенrequestСтроительство。

Описание поля:

  • snapshot: Считывает имя снимка из входного потока.
  • repository: Считывает имя репозитория из входного потока.
  • indices: Считайте индексированный массив из входного потока.
  • indicesOptions: Считайте параметры индекса из входного потока.
  • if (in.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) { readSettingsFromStream(in); }: Если транспортная версия предшествует указанной версии, прочтите настройки из входного потока.
  • featureStates: Считывает массив статусов объектов из входного потока.
  • includeGlobalState: Считывает логическое значение из входного потока, следует ли включать глобальное состояние.
  • waitForCompletion: Логическое значение, указывающее, следует ли ждать завершения при чтении из входного потока.
  • partial: Считывает логическое значение из входного потока, указывающее, является ли снимок частичным.
  • userMetadata: Считайте метаданные пользователя из входного потока.

2.SnapshotsService.java

Этот класс в основном отвечает за создание. сопутствующие услуги. Эта услуга работает через Create снимок,Удалить makeВсе шаги, выполняемые на главном узле.

Язык кода:java
копировать
public static final Setting<Integer> MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING = Setting.intSetting(
        "snapshot.max_concurrent_operations",
        1000,
        1,
        Setting.Property.NodeScope,
        Setting.Property.Dynamic
    );
 
 private volatile int maxConcurrentOperations;

в этом классе,Сначала мы видим, что создали файл с именемMAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTINGстатическая константа,Диапазон от 1 до 1000. Значение по умолчанию — 1000. Этот параметр используется для управления максимальным количеством одновременных задач создания снимков, которые могут выполняться одновременно. Если мы не укажем значение этого параметра в операторе моментального снимка,Тогда снимок загрузит значение этого параметра по умолчанию при выполнении построения.

Язык кода:java
копировать
public SnapshotsService(
        Settings settings,
        ClusterService clusterService,
        IndexNameExpressionResolver indexNameExpressionResolver,
        RepositoriesService repositoriesService,
        TransportService transportService,
        ActionFilters actionFilters,
        SystemIndices systemIndices
    ) {
        this.clusterService = clusterService;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.repositoriesService = repositoriesService;
        this.threadPool = transportService.getThreadPool();
        this.transportService = transportService;

        // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
        this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(
            transportService,
            clusterService,
            threadPool,
            actionFilters,
            indexNameExpressionResolver
        );
        if (DiscoveryNode.isMasterNode(settings)) {
            // addLowPriorityApplier to make sure that Repository will be created before snapshot
            clusterService.addLowPriorityApplier(this);
            maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings);
            clusterService.getClusterSettings()
                .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i);
        }
        this.systemIndices = systemIndices;

        this.masterServiceTaskQueue = clusterService.createTaskQueue("snapshots-service", Priority.NORMAL, new SnapshotTaskExecutor());
    }
    

В этом конструкторе инициализируются некоторые ключевые параметры, участвующие в процессах создания, резервного копирования и удаления моментальных снимков.

Анализ параметров

Settings settings: Элементы конфигурации, включая параметры, связанные с конфигурацией кластера.

ClusterService clusterService: Службы кластеров обеспечивают состояние кластера и операции на уровне кластера.

IndexNameExpressionResolver indexNameExpressionResolver: Анализатор выражений имен индексов, используемый для анализа выражений имен индексов.

RepositoriesService repositoriesService: Служба складов управляет созданием хранилищ моментальных снимков и доступом к ним.

TransportService transportService: Транспортная служба, отвечающая за связь между узлами.

ActionFilters actionFilters: Фильтр действий управляет логикой фильтрации запросов операций.

SystemIndices systemIndices: Системный индекс, управляет индексами системного уровня.

Язык кода:java
копировать
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
        final String repositoryName = request.repository();
        final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
        validate(repositoryName, snapshotName);
        // TODO: create snapshot UUID in CreateSnapshotRequest and make this operation idempotent to cleanly deal with transport layer
        // retries
        final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
        Repository repository = repositoriesService.repository(request.repository());
        if (repository.isReadOnly()) {
            listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
            return;
        }
        submitCreateSnapshotRequest(request, listener, repository, new Snapshot(repositoryName, snapshotId), repository.getMetadata());
    }
    
    

проходитьcreateSnapshot()Метод, используемый для инициализации процесса моментального снимка。В этом методе мы видим, что дляrepositoryName,snapshotNameПолучить и проанализировать,Оценка разрешений для хранилища снимков,Если разрешения на складReadOnlyзатем вернитесь напрямую。Звонок после завершения всех проверокsubmitCreateSnapshotRequest()методзафиксировать снимокпросить。

Язык кода:java
копировать
private void submitCreateSnapshotRequest(
        CreateSnapshotRequest request,
        ActionListener<Snapshot> listener,
        Repository repository,
        Snapshot snapshot,
        RepositoryMetadata initialRepositoryMetadata
    ) {
        repository.getRepositoryData(
            listener.delegateFailure(
                (l, repositoryData) -> masterServiceTaskQueue.submitTask(
                    "create_snapshot [" + snapshot.getSnapshotId().getName() + ']',
                    new CreateSnapshotTask(repository, repositoryData, l, snapshot, request, initialRepositoryMetadata),
                    request.masterNodeTimeout()
                )
            )
        );
    }

submitCreateSnapshotRequest()Основное использование методов заключается в отправке Создать просьба Сэта. После правильного получения всех параметров будет создан новый резервный снимок SnapshotTask.

В основном он включает в себя следующие ключевые этапы:

1. Получить данные склада: Сначала все начинается с вызова repository.getRepositoryData для получения данных из хранилища снимков.

2. Реагировать на неудачные ситуации: он использует listener.delegateFailure метод обработки любых возможных условий отказа.

3. Отправить задачу создания снимка: Если данные склада успешно получены, будет создано Задание фотографа было отправлено masterServiceTaskQueue。

Анализ параметров

CreateSnapshotRequest request: Создать Объект запроса съемки содержит все параметры, необходимые для операции создания моментального снимка.

ActionListener listener: Прослушиватель обратного вызова после завершения операции для обработки успеха или неудачи.

Repository repository: Целевой репозиторий для операций моментальных снимков.

Snapshot snapshot: Объект моментального снимка, который необходимо создать.

RepositoryMetadata initialRepositoryMetadata: Начальные метаданные хранилища.

После отправки задачи моментального снимка нам необходимо проанализировать и выполнить запрос моментального снимка. Здесь мы продолжаем смотреть вниз.

Язык кода:java
копировать
private class SnapshotTaskExecutor implements ClusterStateTaskExecutor<SnapshotTask> {
        @Override
        public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionContext) throws Exception {
            final ClusterState state = batchExecutionContext.initialState();
            final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext(batchExecutionContext);
            final SnapshotsInProgress initialSnapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
            SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState();
            for (final var taskContext : batchExecutionContext.taskContexts()) {
                if (taskContext.getTask()instanceof CreateSnapshotTask task) {
                    try {
                        final var repoMeta = state.metadata()
                            .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY)
                            .repository(task.snapshot.getRepository());
                        if (Objects.equals(task.initialRepositoryMetadata, repoMeta)) {
                            snapshotsInProgress = createSnapshot(task, taskContext, state, snapshotsInProgress);
                        } else {
                            // repository data changed in between starting the task and executing this cluster state update so try again
                            taskContext.success(
                                () -> submitCreateSnapshotRequest(
                                    task.createSnapshotRequest,
                                    task.listener,
                                    task.repository,
                                    task.snapshot,
                                    repoMeta
                                )
                            );
                        }
                    } catch (Exception e) {
                        taskContext.onFailure(e);
                    }
                }
            }
            shardsUpdateContext.completeWithUpdatedState(snapshotsInProgress);
            if (snapshotsInProgress == initialSnapshots) {
                return state;
            }
            return ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
        }

        private SnapshotsInProgress createSnapshot(
            CreateSnapshotTask createSnapshotTask,
            TaskContext<SnapshotTask> taskContext,
            ClusterState currentState,
            SnapshotsInProgress snapshotsInProgress
        ) {
            final RepositoryData repositoryData = createSnapshotTask.repositoryData;
            final Snapshot snapshot = createSnapshotTask.snapshot;
            final String repositoryName = snapshot.getRepository();
            final String snapshotName = snapshot.getSnapshotId().getName();
            ensureRepositoryExists(repositoryName, currentState);
            final Repository repository = createSnapshotTask.repository;
            ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
            ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
            validate(repositoryName, snapshotName, currentState);
            final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
                SnapshotDeletionsInProgress.TYPE,
                SnapshotDeletionsInProgress.EMPTY
            );
            ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
            ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);

            final CreateSnapshotRequest request = createSnapshotTask.createSnapshotRequest;
            // Store newSnapshot here to be processed in clusterStateProcessed
            Map<Boolean, List<String>> requestedIndices = Arrays.stream(
                indexNameExpressionResolver.concreteIndexNames(currentState, request)
            ).collect(Collectors.partitioningBy(systemIndices::isSystemIndex));
            
            List<String> requestedSystemIndices = requestedIndices.get(true);
            //Проверка системного индекса
            if (requestedSystemIndices.isEmpty() == false) {
                Set<String> explicitlyRequestedSystemIndices = new HashSet<>(requestedSystemIndices);
                explicitlyRequestedSystemIndices.retainAll(Arrays.asList(request.indices()));
                if (explicitlyRequestedSystemIndices.isEmpty() == false) {
                    throw new IllegalArgumentException(
                        format(
                            "the [indices] parameter includes system indices %s; to include or exclude system indices from a "
                                + "snapshot, use the [include_global_state] or [feature_states] parameters",
                            explicitlyRequestedSystemIndices
                        )
                    );
                }
            }

            List<String> indices = requestedIndices.get(false);

            final List<String> requestedStates = Arrays.asList(request.featureStates());
            final Set<String> featureStatesSet;
            //Проверка статуса запроса
            if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
                if (request.includeGlobalState() && requestedStates.isEmpty()) {
                    // If we're including global state and feature states aren't specified, include all of them
                    featureStatesSet = systemIndices.getFeatureNames();
                } else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
                    // If there's exactly one value and it's "none", include no states
                    featureStatesSet = Collections.emptySet();
                } else {
                    // Otherwise, check for "none" then use the list of requested states
                    if (requestedStates.contains(NO_FEATURE_STATES_VALUE)) {
                        throw new IllegalArgumentException(
                            "the feature_states value ["
                                + SnapshotsService.NO_FEATURE_STATES_VALUE
                                + "] indicates that no feature states should be snapshotted, "
                                + "but other feature states were requested: "
                                + requestedStates
                        );
                    }
                    featureStatesSet = new HashSet<>(requestedStates);
                    featureStatesSet.retainAll(systemIndices.getFeatureNames());
                }
            } else {
                featureStatesSet = Collections.emptySet();
            }

            final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
            final Set<String> systemDataStreamNames = new HashSet<>();
            final Set<String> indexNames = new HashSet<>(indices);
            for (String featureName : featureStatesSet) {
                SystemIndices.Feature feature = systemIndices.getFeature(featureName);

                Set<String> featureSystemIndices = feature.getIndexDescriptors()
                    .stream()
                    .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
                    .collect(Collectors.toSet());
                Set<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
                    .stream()
                    .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
                    .collect(Collectors.toSet());

                Set<String> featureSystemDataStreams = new HashSet<>();
                Set<String> featureDataStreamBackingIndices = new HashSet<>();
                for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
                    List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
                    if (backingIndexNames.size() > 0) {
                        featureDataStreamBackingIndices.addAll(backingIndexNames);
                        featureSystemDataStreams.add(sdd.getDataStreamName());
                    }
                }

                if (featureSystemIndices.size() > 0 || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) {

                    featureStates.add(new SnapshotFeatureInfo(featureName, List.copyOf(featureSystemIndices)));
                    indexNames.addAll(featureSystemIndices);
                    indexNames.addAll(featureAssociatedIndices);
                    indexNames.addAll(featureDataStreamBackingIndices);
                    systemDataStreamNames.addAll(featureSystemDataStreams);
                }
                indices = List.copyOf(indexNames);
            }

            logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);

            final Map<String, IndexId> allIndices = new HashMap<>();
            for (SnapshotsInProgress.Entry runningSnapshot : snapshotsInProgress.forRepo(repositoryName)) {
                allIndices.putAll(runningSnapshot.indices());
            }
            final Map<String, IndexId> indexIds = repositoryData.resolveNewIndices(indices, allIndices);
            final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
            ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(
                snapshotsInProgress,
                deletionsInProgress,
                currentState,
                indexIds.values(),
                useShardGenerations(version),
                repositoryData,
                repositoryName
            );
            if (request.partial() == false) {
                Set<String> missing = new HashSet<>();
                //Проверка информации, связанной с шардингом
                for (Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
                    if (entry.getValue().state() == ShardState.MISSING) {
                        missing.add(entry.getKey().getIndex().getName());
                    }
                }
                if (missing.isEmpty() == false) {
                    throw new SnapshotException(snapshot, "Indices don't have primary shards " + missing);
                }
            }
            // Создать новую запись снимка
            final var newEntry = SnapshotsInProgress.startedEntry(
                snapshot,
                request.includeGlobalState(),
                request.partial(),
                indexIds,
                CollectionUtils.concatLists(
                    indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()),
                    systemDataStreamNames
                ),
                threadPool.absoluteTimeInMillis(),
                repositoryData.getGenId(),
                shards,
                request.userMetadata(),
                version,
                List.copyOf(featureStates)
            );
            // Обновить контекст задачии Вернуть новый прогресс снимка
            final var res = snapshotsInProgress.withAddedEntry(newEntry);
            taskContext.success(() -> {
                logger.info("snapshot [{}] started", snapshot);
                createSnapshotTask.listener.onResponse(snapshot);
                if (newEntry.state().completed()) {
                    endSnapshot(newEntry, currentState.metadata(), createSnapshotTask.repositoryData);
                }
            });
            return res;
        }
    }

в это времяElasticsearchЗдесь файл под названиемSnapshotTaskExecutorиз内部добрый,ОсуществленныйClusterStateTaskExecutor<SnapshotTask> Интерфейс для выполнения задач создания снимков.

существовать Это внутреннеедобрыйсерединаpublic ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionContext)Метод отвечает за обработку отправленных задач снимка.,И выполнить соответствующие операции в зависимости от типа задачи.

В основном он включает в себя следующие этапы:

1. Инициализировать состояние кластера:获取初始из集群состояние。

2. Создать контекст обновления:используется для更新Снимокизсостояние。

3. Получить текущий снимок в процессе:из кластерасостояниесередина Получить текущий снимок в процессеинформация。

4. задачи обработки:Обход контекста задачи,Проверьте, является ли тип задачи CreateSnapshotTask и выполнена ли операция.

• Если метаданные хранилища не изменились, позвоните createSnapshot метод Создать снимок。

• Если метаданные хранилища изменяются, повторите команду Создать. снимокпросить。

5. Обновить статус снимка:Обновить статус снимка,если нет изменений,Вернуться в исходное состояние кластера,В противном случае верните обновленное состояние кластера.

private SnapshotsInProgress createSnapshot(CreateSnapshotTask createSnapshotTask,TaskContext<SnapshotTask> taskContext,ClusterState currentState,SnapshotsInProgress snapshotsInProgress) Метод, отвечающий за конкретное создание сделать шаги.

В этом методе детально проверяются хранилище моментальных снимков, моментальный снимок, индекс и статус подключаемого модуля. В основном это включает в себя следующие шаги

1. Различные проверки данных, связанных со снимками:Убедитесь, что склад существует、Доступно имя снимка、На средней очистке не происходит.

2. Обработка запрошенного индекса и статуса функции:в соответствии спросить处理需要Снимокиз索引ихарактеристикасостояние。

3. Создать новую запись снимка:Создать новую запись снимка。

4. Обновить контекст задачи:Обновить контекст задачу и вызвать метод ответа прослушивателя при запуске моментального снимка.

5. Вернуть новый прогресс снимка:Возвращает обновленный прогресс снимка。

3.SnapshotShardsService.java

SnapshotShardsService.javaВ основном работает на узлах данных,и контролировать снимки осколков, которые выполняются на этих узлах,Отвечает за управление этими снимками на уровне сегментов.,включая запуск,Хватит ждать действий.

Язык кода:java
копировать
public SnapshotShardsService(
        Settings settings,
        ClusterService clusterService,
        RepositoriesService repositoriesService,
        TransportService transportService,
        IndicesService indicesService
    ) {
        this.indicesService = indicesService;
        this.repositoriesService = repositoriesService;
        this.transportService = transportService;
        this.clusterService = clusterService;
        this.threadPool = transportService.getThreadPool();
        this.remoteFailedRequestDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
        if (DiscoveryNode.canContainData(settings)) {
            // this is only useful on the nodes that can hold data
            clusterService.addListener(this);
        }
    }

Сначала инициализируется в конструктореindices,repository,cluster,threadPoolСвязанные свойства。в то же времянужно вниманиеclusterService.addListener(this);Действует только на том узле, где хранятся данные.。

Язык кода:java
копировать
private void snapshot(
        final ShardId shardId,
        final Snapshot snapshot,
        final IndexId indexId,
        final IndexShardSnapshotStatus snapshotStatus,
        Version version,
        final long entryStartTime,
        ActionListener<ShardSnapshotResult> resultListener
    ) {
        ActionListener.run(resultListener, listener -> {
            snapshotStatus.ensureNotAborted();
            final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
            if (indexShard.routingEntry().primary() == false) {
                throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
            }
            if (indexShard.routingEntry().relocating()) {
                // do not snapshot when in the process of relocation of primaries so we won't get conflicts
                throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
            }

            final IndexShardState indexShardState = indexShard.state();
            if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
                // shard has just been created, or still recovering
                throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
            }

            final Repository repository = repositoriesService.repository(snapshot.getRepository());
            Engine.IndexCommitRef snapshotRef = null;
            try {
                snapshotRef = indexShard.acquireIndexCommitForSnapshot();
                snapshotStatus.ensureNotAborted();
                repository.snapshotShard(
                    new SnapshotShardContext(
                        indexShard.store(),
                        indexShard.mapperService(),
                        snapshot.getSnapshotId(),
                        indexId,
                        snapshotRef,
                        getShardStateId(indexShard, snapshotRef.getIndexCommit()),
                        snapshotStatus,
                        version,
                        entryStartTime,
                        listener
                    )
                );
            } catch (Exception e) {
                IOUtils.close(snapshotRef);
                throw e;
            }
        });
    }

snapshot()В основном отвечает за создание снимков осколков.。существовать Долженметодсерединанас可以看到существовать Получить индексинформацияс шардингомidназад,Этот метод определяет, является ли текущий сегмент основным.,Он находится в статусе переезда?,Он находится в состоянии инициализации?,Все было строго проверено. после позднего завершения,создаст снимок осколка。Создание снимка завершеноназадвызовIndexShard.javaвacquireIndexCommitForSnapshot()метод Отправьте индексный файл。нассуществоватьназад续将对Долженметодпровести анализ。

Здесь мы видим только проверку шардов,Если что-то пошло не так с индексом перед проверкой шарда,Тогда здесь будут лазейки в логике,Итак, мы нашли,Предоставление объектаindexShardПолучить индексс шардингомиз相关информация时вызов了indexServiceSafe()метод,Возможно, он сможет ответить на наши вопросы.

Язык кода:java
копировать
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
Язык кода:java
копировать
public IndexService indexServiceSafe(Index index) {
        IndexService indexService = indices.get(index.getUUID());
        if (indexService == null) {
            throw new IndexNotFoundException(index);
        }
        assert indexService.indexUUID().equals(index.getUUID())
            : "uuid mismatch local: " + indexService.indexUUID() + " incoming: " + index.getUUID();
        return indexService;
    }

существоватьindexServiceSafe(Index index)методсерединанас可以看到,Существование входящего индекса проверено. Это позволяет избежать исключений создания, вызванных проблемами индекса при создании снимков сегментов.

При создании моментального снимка сегмента,Чтобы гарантировать, что созданный нами снимок сегмента соответствует информации о сегменте, хранящейся в текущем кластере Elasticsearch.,здесь мыпроходитьgetShardStateId(IndexShard indexShard,IndexCommit snapshotIndexCommit)Выполнить проверку согласованности。

Язык кода:java
копировать
@Nullable
    public static String getShardStateId(IndexShard indexShard, IndexCommit snapshotIndexCommit) throws IOException {
        final Map<String, String> userCommitData = snapshotIndexCommit.getUserData();
        final SequenceNumbers.CommitInfo seqNumInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userCommitData.entrySet());
        final long maxSeqNo = seqNumInfo.maxSeqNo;
        if (maxSeqNo != seqNumInfo.localCheckpoint || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint()) {
            return null;
        }
        return userCommitData.get(Engine.HISTORY_UUID_KEY)
            + "-"
            + userCommitData.getOrDefault(Engine.FORCE_MERGE_UUID_KEY, "na")
            + "-"
            + maxSeqNo;
    }

В методе getShardStateId() для сегмента генерируется идентификатор на основе его текущего состояния. Этот идентификатор можно использовать для определения того, изменилось ли содержимое сегмента между двумя снимками. Если глобальная и локальная контрольные точки шарда равны, предполагается, что содержимое шарда не изменилось, его максимальный порядковый номер не изменился и его history- и force-merge-uuid Без изменений. Если глобальные и локальные контрольные точки шарда различны, этот метод возвращает {@code null}, потому что безопасное уникальное состояние шарда в этом случае использовать нельзя. ID, так как основной аварийный переход может привести к разному содержимому сегмента для одного и того же порядкового номера в последующих снимках.

Язык кода:java
копировать
/** Notify the master node that the given shard has been successfully snapshotted **/
    private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, ShardSnapshotResult shardSnapshotResult) {
        assert shardSnapshotResult != null;
        assert shardSnapshotResult.getGeneration() != null;
        sendSnapshotShardUpdate(snapshot, shardId, ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult));
    }

    /** Notify the master node that the given shard failed to be snapshotted **/
    private void notifyFailedSnapshotShard(
        final Snapshot snapshot,
        final ShardId shardId,
        final String failure,
        final ShardGeneration generation
    ) {
        sendSnapshotShardUpdate(
            snapshot,
            shardId,
            new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, generation)
        );
    }

После завершения резервного копирования моментального снимка на уровне сегмента нам также необходимо синхронизировать сегмент с главным узлом. Информация, связанная со снимком, и статус запроса. На данный момент нам необходимо определить статус снимка. вызов不同из回调метод К главному узлуи Шардинг синхронизации узлов данныхинформация,Снимоксостояние。Ранназад才会进行相关资源из释放;Вызывается, если резервное копирование прошло успешноnotifySuccessfulSnapshotShard(),Если не получится, нужно позвонитьnotifyFailedSnapshotShard()。Независимо от того, какой этометод,Все необходимоесуществовать Передано при обратном вызовеsnapshot,shardId,ShardSnapshotStatus,ShardGenerationЭти четыре необходимых параметра。

Язык кода:java
копировать
private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
        remoteFailedRequestDeduplicator.executeOnce(
            new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
            new ActionListener<>() {
                @Override
                public void onResponse(Void aVoid) {
                    logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
                }

                @Override
                public void onFailure(Exception e) {
                    logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
                }
            },
            (req, reqListener) -> transportService.sendRequest(
                transportService.getLocalNode(),
                SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
                req,
                new ActionListenerResponseHandler<>(reqListener.map(res -> null), in -> ActionResponse.Empty.INSTANCE)
            )
        );
    }

Этот метод отвечает за обновление статуса снимка на главном узле.

4.IndexShard.java

Язык кода:java
копировать
public Engine.IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
        final IndexShardState state = this.state; // one time volatile read
        if (state == IndexShardState.STARTED) {
            // unlike acquireLastIndexCommit(), there's no need to acquire a snapshot on a shard that is shutting down
            return getEngine().acquireIndexCommitForSnapshot();
        } else {
            throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
        }
    }

этотметодиспользуется длясуществовать Снимок过程середина获取当前索引分片из索引提交(Index Commit)Цитировать,以确保Снимок操作能够существовать一致из视图上执行。Долженметодсуществовать Получить ссылку на фиксацию индекса проверит состояние шардов индекса и разрешит Tweet только в том случае, если шарды находятся в запущенном состоянии. ссылку на фиксацию индекса. Пройти проверяет состояние сегмента и гарантирует, что Tweet разрешен только в том случае, если сегмент включен, а не выключен. ссылку на фиксацию индекса。Если шардинг не началсясостояние,выдает исключение,Предотвратите незаконные операции со снимками。существовать Долженметодиз返回серединаgetEngine().acquireIndexCommitForSnapshot();вызывающий двигатель acquireIndexCommitForSnapshot Метод для получения ссылки на фиксацию индекса текущего фрагмента индекса. Это гарантирует, что полученная фиксация представляет собой согласованное представление, которое можно использовать для операций моментального снимка.

Анализируя логику этого метода, мы можем найти:

1. Чтение статуса шарда

• Final IndexShardState state = this.state;: прочитать текущее состояние сегмента за один раз. Здесь this.state — изменчивая переменная, поэтому чтение является потокобезопасным.

2. Проверить статус шарда

• if (state == IndexShardState.STARTED): Проверяет, является ли шард статусом STARTED (запущен). Получение ссылок на фиксацию индекса разрешено только при включенном сегментировании.

3. Получить ссылку на фиксацию индекса

• return getEngine().acquireIndexCommitForSnapshot();: вызывающий механизм acquireIndexCommitForSnapshot Метод для получения ссылки на фиксацию индекса текущего фрагмента индекса. Это гарантирует, что полученная фиксация представляет собой согласованное представление, которое можно использовать для операций моментального снимка.

4. Обработка исключений

• Ветка else: если статус сегмента не STARTED, выдается исключение IllegalIndexShardStateException, указывающее, что операции с моментальными снимками не разрешены в текущем состоянии.

getEngine().acquireIndexCommitForSnapshot()Представление индекса, полученное здесь, зависит отLuceneвIndexCommit.java

5.IndexCommit.java

IndexCommit.java Это абстрактный класс в базовом пакете Lucene, представляющий точку фиксации индекса (commit точка). Он играет ключевую роль в процессе управления индексами, особенно при обработке таких операций, как фиксация индекса, удаление и создание снимков.

Язык кода:java
копировать
public abstract class IndexCommit implements Comparable<IndexCommit> {

  /** Получите файл сегмента, связанный с точкой фиксации. */
  public abstract String getSegmentsFileName();

  /** Возвращает все связанные файлы, на которые ссылается эта точка фиксации. */
  public abstract Collection<String> getFileNames() throws IOException;

  /** Возвращает индексированный каталог. */
  public abstract Directory getDirectory();

  /**
   * Удаляет точку фиксации только для точек фиксации, указанных в контексте.
   * После вызова этого метода вызывающая сторона будет уведомлена о необходимости удалить точку отправки. Конкретная политика удаления определяется IndexDeleationPolicy.
   * Его можно вызвать только методом onInit() или onCommit() в соответствии с его политикой IndexDeleationPolicy;
   */
  public abstract void delete();

  /**
   * Возвращает true, если вызывается удаление. По умолчанию вызывается IndexWriter.
   */
  public abstract boolean isDeleted();

  /** Возвращает количество ссылочных сегментов. */
  public abstract int getSegmentCount();

  /**Единственный метод отправки, обычно вызываемый неявно*/
  protected IndexCommit() {}

  /** Используется для определения равенства содержимого и каталогов, отправленных двумя IndexCommit. */
  @Override
  public boolean equals(Object other) {
    if (other instanceof IndexCommit) {
      IndexCommit otherCommit = (IndexCommit) other;
      return otherCommit.getDirectory() == getDirectory()
          && otherCommit.getGeneration() == getGeneration();
    } else {
      return false;
    }
  }

  @Override
  public int hashCode() {
    return getDirectory().hashCode() + Long.valueOf(getGeneration()).hashCode();
  }

  /** Возвращает сегмент, созданный текущей отправкой */
  public abstract long getGeneration();

  /**
   * Возвращает пользовательские данные, переданные в IndexWriter.
   */
  public abstract Map<String, String> getUserData() throws IOException;

  @Override
  public int compareTo(IndexCommit commit) {
    if (getDirectory() != commit.getDirectory()) {
      throw new UnsupportedOperationException(
          "cannot compare IndexCommits from different Directory instances");
    }

    long gen = getGeneration();
    long comgen = commit.getGeneration();
    return Long.compare(gen, comgen);
  }

  /**
   * Получить инициализацию точки фиксации от NRT или не от NRT
   */
  StandardDirectoryReader getReader() {
    return null;
  }
}

Этот абстрактный класс в основном предоставляет следующие функции:

1. Управление и точка подачи справочного индекса

• Предоставляет интерфейс для получения файлов сегментов, индексных файлов и каталогов для определенной точки фиксации.

2. Удаление и проверка точек фиксации

• Позволяет отмечать точки фиксации для удаления с помощью метода delete и проверять статус удаления с помощью метода isDeleted.

3. Отправить сравнение баллов

• Сравните поколения двух точек фиксации с помощью метода CompareTo, чтобы убедиться в правильном порядке операций.

4. Управление пользовательскими данными

• Поддерживает хранение и извлечение пользовательских данных, связанных с точками отправки.

5. Обеспечьте последовательность

• предоставил equals и hashCode Метод, обеспечивающий согласованность и уникальность точек подачи.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose