Предисловие: Elasticsearch — это распределенная система поиска и анализа с открытым исходным кодом, которая обеспечивает хорошие возможности вставки данных и гибкий метод обновления данных. Далее следует большое количество документов, удаленных в результате операций обновления. В то же время многим пользователям необходимо удалить данные индекса elasticsearch по разным причинам при использовании elasticsearch. Также будет создано большое количество документов doc.deleted.
Bulk: метод обновления массовой вставки.
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
обновление: точно обновить указанные данные на основе идентификатора.
POST /<index>/_update/<_id>
update_by_query: Сопоставьте и обновите данные в соответствии с указанными условиями запроса.
POST my-index-000001/_update_by_query?conflicts=proceed
Когда клиент инициирует операцию обновления, elasticsearch сначала найдет соответствующий документ на основе условий обновления (например: _id, переданный API обновления, или оператор сопоставления, переданный update_by_query). Elasticsearch использует уникальный идентификатор документа (_id) для поиска документа. Когда документ, который нужно обновить, найден, elasticsearch сначала пометит исходный старый документ как удаленный. Новый документ будет вставлен в индекс. Новый документ имеет тот же уникальный идентификатор (_id) для реализации операции обновления документа.
@Override
protected void searchToString(StringBuilder b) {
super.searchToString(b);
if (script != null) {
b.append(" updated with ").append(script);
}
}
Кроме того, elasticsearch также предоставляет метод обновления сценария, когда клиент выполняет операции обновления.
Инициирование операции update_by_query будет реализовано через этот класс.
public class UpdateByQueryAction extends ActionType<BulkByScrollResponse> {
public static final UpdateByQueryAction INSTANCE = new UpdateByQueryAction();
public static final String NAME = "indices:data/write/update/byquery";
private UpdateByQueryAction() {
super(NAME, BulkByScrollResponse::new);
}
}
В этом классе определен статический объект INSTANCE, представляющий одноэлементный экземпляр UpdateByQueryAction. В то же время определяется константа NAME, указывающая, что имя действия — «indices:data/write/update/byquery». В конструкторе вызывается конструктор super(NAME, BulkByScrollResponse::new) родительского класса. Здесь передаются имя действия и функция обратного вызова для создания объекта BulkByScrollResponse.
BulkByScrollResponse — это объект, возвращаемый после выполнения операции обновления. Включая время выполнения, статус выполнения и другую информацию.
Используйте этот конструктор в BulkByScrollResponse для пакетной прокрутки объектов операции.
Получает следующие параметры:
- took : Указывает время, затраченное на операцию.
- status : Указывает состояние задачи пакетной прокатки, которая BulkByScrollTask.Status объект.
- bulkFailures : представляет список неудачных пакетных операций. List<Failure> объект.
- searchFailures : представляет список неудачных поисков, List<ScrollableHitSource.SearchFailure> объект.
- timedOut : Указывает, истекло ли время ожидания операции.
private TimeValue took;
private BulkByScrollTask.Status status;
private List<Failure> bulkFailures;
private List<ScrollableHitSource.SearchFailure> searchFailures;
private boolean timedOut;
private static final String TOOK_FIELD = "took";
private static final String TIMED_OUT_FIELD = "timed_out";
private static final String FAILURES_FIELD = "failures";
public BulkByScrollResponse(
TimeValue took,
BulkByScrollTask.Status status,
List<Failure> bulkFailures,
List<ScrollableHitSource.SearchFailure> searchFailures,
boolean timedOut
) {
this.took = took;
this.status = requireNonNull(status, "Null status not supported");
this.bulkFailures = bulkFailures;
this.searchFailures = searchFailures;
this.timedOut = timedOut;
}
public BulkByScrollResponse(Iterable<BulkByScrollResponse> toMerge, @Nullable String reasonCancelled) {
long mergedTook = 0;
List<BulkByScrollTask.StatusOrException> statuses = new ArrayList<>();
bulkFailures = new ArrayList<>();
searchFailures = new ArrayList<>();
for (BulkByScrollResponse response : toMerge) {
mergedTook = max(mergedTook, response.getTook().nanos());
statuses.add(new BulkByScrollTask.StatusOrException(response.status));
bulkFailures.addAll(response.getBulkFailures());
searchFailures.addAll(response.getSearchFailures());
timedOut |= response.isTimedOut();
}
took = timeValueNanos(mergedTook);
status = new BulkByScrollTask.Status(statuses, reasonCancelled);
}
Конкретные шаги обновления:
Преимущества: Можно сразу освободить место на диске.
Недостаток: все данные во всем индексе будут удалены. Необходимость удаления только части данных удовлетворить невозможно.
Преимущества: Гибкая работа, возможность удаления определенных данных в зависимости от входящих условий.
Недостатки: Процесс удаления метки занимает много времени и дисковое пространство освобождается медленно. Этот метод можно использовать для удаления данных, когда на диске достаточно места.
При выполнении операции удаления elasticsearch найдет документ, который нам нужно удалить, на основе переданных нами условий (например: _id, переданный с помощью API-интерфейса удаления, или оператор сопоставления, переданный в delete_by_query). Документы для удаления затем помечаются как удаленные, и документы не удаляются с диска сразу после пометки. Это необходимо для повышения производительности и предотвращения потери данных. Документы, помеченные как удаленные, все еще существуют в индексе, но отфильтровываются при поиске и запросах. Впоследствии elasticsearch автоматически объединит сегменты документов, помеченные для удаления.
public class DeleteByQueryAction extends ActionType<BulkByScrollResponse> {
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
public static final String NAME = "indices:data/write/delete/byquery";
private DeleteByQueryAction() {
super(NAME, BulkByScrollResponse::new);
}
}
Когда elasticsearch выполняет действие удаления, он также определяет статический объект INSTANCE, представляющий одноэлементный экземпляр «DeleteByQueryAction». В то же время определяется константа `NAME`, указывающая, что имя действия — «indexes:data/write/delete/byquery».
В конструкторе вызывается конструктор родительского класса super(NAME, BulkByScrollResponse::new)`. Здесь передаются имя действия и функция обратного вызова для создания объекта BulkByScrollResponse.
Общая информация, возвращаемая BulkByScrollResponse, в основном такая же, как описано ранее.
Стоит отметить, что операции обновления и удаления генерируют большое количество doc.deleted. Мы обнаружим интересное явление.
Когда выполняется большое количество операций обновления, использование диска нашим кластером эластичного поиска в определенной степени увеличится. Через определенный период времени использование диска уменьшится и станет соответствовать использованию диска до операции обновления данных.
Аналогичным образом, часто, когда мы удаляем данные с помощью delete_by_query, мы наблюдаем за использованием диска кластером и обнаруживаем, что использование диска не падает сразу, а постепенно снижается очень медленно.
Это связано с тем, что в elasticsearch, когда документ помечается как удаленный, elasticsearch выполняет операцию слияния (также называемую объединением сегментов).
О том, как elasticsearch объединит сгенерированный документ doc.deleted после операции обновления и какие проблемы могут возникнуть в процессе слияния, мы обсудим в последующих статьях.