ElasticSearch6.X版本Java Api中文详解(七)之Update By Query API解析

《ElasticSearch6.X版本Java Api中文详解(七)之Update By Query API解析》

updateByQuery

最简单的用法是更新索引中的每个文档,而无需更改源。这种用法允许拾取新属性或另一个在线映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

对updateByQuery API的调用首先获取索引的快照,对使用内部版本控制的任何文档进行索引。

注意:当一个文档在快照的时间和索引请求过程的时间之间发生变化时,就会发生版本冲突。.

当版本匹配时,updateByQuery会更新文档并增加版本号。

所有更新和查询失败都会导致updateByQuery中止。这些故障可以从bulk by scroll response getinde方法中获得。任何成功的更新都是保留的,不会回滚。当第一个故障导致中止时,响应包含失败的批量请求所产生的所有故障。

为了防止版本冲突导致updateByQuery中止,设置终止冲突(false)。第一个例子是这样做的,因为它试图获取一个在线映射更改,而版本冲突意味着在updateByQuery的开始和试图更新文档的时间之间更新了相互冲突的文档。这很好,因为更新将会获得在线地图更新。

UpdateByQueryRequestBuilder API支持过滤更新后的文档,限制更新的文档总数,并使用脚本更新文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder还允许直接访问用来选择文档的查询。您可以使用这个访问来改变默认的滚动大小,或者对匹配文档的请求进行修改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您还可以将大小与排序合并,以限制更新的文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了改变文档的源代码字段之外,您还可以使用一个脚本来更改动作,类似于更新API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely) {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

正如在更新API中一样,您可以设置ctx的值。op改变执行的操作:

无操作

ctx设置。如果你的脚本没有任何变化,op=“noop”。updateByQuery operaton会从更新中删除该文档。这种行为增加了响应体中的noop计数器。

删除

ctx设置。如果您的脚本决定必须删除该文档,那么op=“delete”。删除将在响应主体中被删除的计数器中报告。

设置ctx。op对任何其他值都产生一个错误。在ctx中设置任何其他字段会产生一个错误。

这个API不允许您移动它所触及的文档,只是修改它们的源代码。这是故意的!我们没有规定将文件从原来的位置删除。

您还可以一次在多个索引和类型上执行这些操作,类似于搜索API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果您提供了一个路由值,那么进程将路由值复制到滚动查询,将进程限制为与该路由值相匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery还可以通过指定这样一条管道来使用摄取节点:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

Works with the Task APIedit

您可以使用Task API获取所有运行更新查询请求的状态:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

上面显示的TaskId可以直接查找任务:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

Works with the Cancel Task APIedit

查询的任何更新都可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用列表任务API来查找taskId的值。

取消请求通常是一个非常快速的过程,但可能需要几秒钟。任务状态API继续列出任务,直到取消完成为止。

Rethrottlingedit

在运行的更新中使用Rethrottlingedit API来更改requestspersecond的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
.setTaskId(taskId)
.setRequestsPerSecond(2.0f)
.get();

使用列表任务API来查找taskId的值。

与updateByQuery API一样,requestspersecond的值可以是任何正浮动值来设置或者用POSITIVE_INFINITY禁用节流。requests_per_second的值将立即生效。为了防止滚动超时,requests_per_second在完成当前批处理后,会减慢查询的速度。

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注