Bulk操作
Refresh flush流程
Bulk API
如上图,使用Java Rest API时,BulkRequest可以添加的Request包含
- IndexRequest;
- DeleteRequest;
- UpdateRequest;
Delte + Index 测试
第一次测试
Java
client.indices().create(new org.elasticsearch.client.indices.CreateIndexRequest("xiaolongtest").settings(Settings.builder()
.put("refresh_interval", "30s")
.build()), RequestOptions.DEFAULT);
Map<String, Object> sourceMap = new HashMap<String, Object>();
sourceMap.put("key1", new Date());
sourceMap.put("key2", "asdfasdf");
client.bulk(Requests.bulkRequest().add(Requests.indexRequest("xiaolongtest").id("1").source(sourceMap))
.add(Requests.deleteRequest("xiaolongtest").id("1"))
.add(Requests.indexRequest("xiaolongtest").id("2").source(sourceMap))
, RequestOptions.DEFAULT);
- 设置index的refresh_interval为30秒。
- 在一个BulkRequest中,按顺序先添加文档A的IndexRequest并指定id,再添加文档A的DeleteRequest,再添加文档B的IndexRequest并指定id。
- 执行结束后,index中只有文档B。
第二次测试
Java
client.indices().create(new org.elasticsearch.client.indices.CreateIndexRequest("xiaolongtest").settings(Settings.builder()
.put("refresh_interval", "30s")
.build()), RequestOptions.DEFAULT);
Map<String, Object> sourceMap = new HashMap<String, Object>();
sourceMap.put("key1", new Date());
sourceMap.put("key2", "asdfasdf");
client.bulk(Requests.bulkRequest().add(Requests.indexRequest("xiaolongtest").id("1").source(sourceMap))
.add(Requests.deleteRequest("xiaolongtest").id("1"))
.add(Requests.indexRequest("xiaolongtest").id("2").source(sourceMap))
, RequestOptions.DEFAULT);
client.bulk(Requests.bulkRequest()
.add(Requests.deleteRequest("xiaolongtest").id("2"))
.add(Requests.indexRequest("xiaolongtest").id("3").source(sourceMap))
, RequestOptions.DEFAULT);
- 设置index的refresh_interval为30秒。
- 在第一个BulkRequest中,按顺序先添加文档A的IndexRequest并指定id,再添加文档A的DeleteRequest,再添加文档B的IndexRequest并指定id;
- 在第二个BulkRequest中,先添加文档B的DeleteRequest,再添加文档C的IndexRequest。
- 执行结束后,index中无文档,等待refresh后,文档中只有文档C。
测试结论
以上两次测试得出猜测:BulkRequest可以操作index buffer中的数据。
_delete_by_query + index 测试
index中已含有200w数据,使用_delete_by_query删除全部数据,在删除完成前,开始插入200w新数据。待删除完成、插入完成后,index中的数据为新插入的200w数据。
测试结论
以上操作得出猜测:_delete_by_query操作只会删除执行时刻前可查询的数据(segment中的数据)
批量发送IndexRequest测试
代码
Java
BulkRequest bulkRequest1 = new BulkRequest();
for (int i = 0; i < 100; i++) {
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("key1", i);
sourceMap.put("key2", "key2-" + i);
IndexRequest indexRequest = Requests.indexRequest("testindex1");
indexRequest.source(sourceMap);
bulkRequest1.add(indexRequest);
}
CompletableFuture future1 = CompletableFuture.runAsync(() -> {
try {
client.bulk(bulkRequest1, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
});
BulkRequest bulkRequest2 = new BulkRequest();
for (int i = 0; i < 100; i++) {
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("key1", i);
sourceMap.put("key2", "key2-" + i);
IndexRequest indexRequest = Requests.indexRequest("testindex2");
indexRequest.source(sourceMap);
bulkRequest2.add(indexRequest);
}
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
try {
client.bulk(bulkRequest2, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
});
BulkRequest bulkRequest3 = new BulkRequest();
for (int i = 0; i < 100; i++) {
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("key1", i);
sourceMap.put("key2", "key2-" + i);
IndexRequest indexRequest = Requests.indexRequest("testindex3");
indexRequest.source(sourceMap);
bulkRequest3.add(indexRequest);
}
CompletableFuture future3 = CompletableFuture.runAsync(() -> {
try {
client.bulk(bulkRequest3, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
});
BulkRequest bulkRequest4 = new BulkRequest();
for (int i = 0; i < 100; i++) {
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("key1", i);
sourceMap.put("key2", "key2-" + i);
IndexRequest indexRequest = Requests.indexRequest("testindex4");
indexRequest.source(sourceMap);
bulkRequest4.add(indexRequest);
}
CompletableFuture future4 = CompletableFuture.runAsync(() -> {
try {
client.bulk(bulkRequest4, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
});
long now = System.currentTimeMillis();
CompletableFuture.allOf(future1, future2, future3, future4).join();
System.out.println(System.currentTimeMillis() - now);
BulkRequest singleBulk = new BulkRequest();
for (int i = 0; i < 100; i++) {
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("key1", i);
sourceMap.put("key2", "key2-" + i);
singleBulk.add(Requests.indexRequest("testindex1").source(sourceMap))
.add(Requests.indexRequest("testindex2").source(sourceMap))
.add(Requests.indexRequest("testindex3").source(sourceMap))
.add(Requests.indexRequest("testindex4").source(sourceMap));
}
now = System.currentTimeMillis();
client.bulk(singleBulk, RequestOptions.DEFAULT);
System.out.println(System.currentTimeMillis() - now);
以上代码有两种操作:
- 创建4个BulkRequest,每个BulkRequest中有100个同index的IndexRequest,同时执行4个BulkRequest计算执行时间;
- 创建1个BulkRequest,一个BulkReqeust中有4个index,每个index100个IndexRequest,执行1个BulkRequest计算执行时间;
测试结论
以上IndexRequest的数据完全相同,但操作1的执行时间高于操作2,尤其是第一次测试,之后的每轮测试,操作1会比操作2多耗时50 ~ 100 ms。
猜测是因为BulkRequest会先在index buffer中进行处理。
根据以上得出结论:索引操作尽量放到BulkRequest中,即使是不同index的数据。