Java实现对ES数据的新增,删除,修改,及合并
新增数据
代码:
@Autowiredprivate RestHighLevelClient client;/*** @description ES写入数据* @author zae* @date 2022/1/13 14:40* @param index 索引库* @param dataList 数据集合(size为插入数据的条数)*/public void insertEsData(String index,List<Map<String,Object>> dataList) {BulkProcessor bulkProcessor = null;try {bulkProcessor = getBulkProcessor(client);for(Map<String,Object> dataMap:dataList){bulkProcessor.add(new IndexRequest(index).source(dataMap));}// 将数据刷新到ES中bulkProcessor.flush();} catch (Exception e) {e.printStackTrace();} finally {try {boolean terminatedFlag = bulkProcessor.awaitClose(150L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}}}private BulkProcessor getBulkProcessor(RestHighLevelClient client) {BulkProcessor bulkProcessor = null;try {BulkProcessor.Listener listener = new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {logger.info("Try to insert data number : "+ request.numberOfActions());}@Overridepublic void afterBulk(long executionId, BulkRequest request,BulkResponse response) {logger.info("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);}@Overridepublic void afterBulk(long executionId, BulkRequest request,Throwable failure) {logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);}};BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,listener);builder.setBulkActions(5000);//刷新条数builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));// 刷新大小builder.setConcurrentRequests(10);//并发线程数builder.setFlushInterval(TimeValue.timeValueSeconds(100L));// 时间频率builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//重试补偿策略bulkProcessor = builder.build();} catch (Exception e) {e.printStackTrace();try {bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);} catch (Exception e1) {logger.error(e1.getMessage());}}return bulkProcessor;}
注意点:
-
传入数据中的List中的每个Map为一条数据,Map中的key为字段Field,value为值。
-
假如在插入数据前index库中没有定义Field及映射关系,在插入数据时会自动新增字段Field,字段的类型会默认映射为value值的类型。
-
假如新插入的数据和之前第一次插入的数据类型不一致,(相同的key但是value的类型不一样),这样在执行代码时可能不会报错失败,但是实际上并没有插入数据成功。
删除数据
代码
/*** @description ES数据删除* @author zae* @date 2022/1/13 17:14* @param index 索引库* @param id 数据id*/public void delete(String index,String id){DeleteRequest deleteRequest = new DeleteRequest(index,id);DeleteResponse response = null;try {response = client.delete(deleteRequest,RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}System.out.println(response);}
删除的话直接指定索引库和要删除的数据的id进行删除,至于数据的id怎么获取,参考下面更新数据中的代码。
更新数据
直接修改
/*** @description ES数据更新* @author zae* @date 2022/1/13 16:10* @param index 索引库* @param key 字段名* @param value 更新后的值* @param id 需要修改的那条数据的id*/public void update(String index,String key,Object value,String id){try {UpdateRequest request = new UpdateRequest();request.index(index) //索引名.id(id)//id.doc(XContentFactory.jsonBuilder().startObject().field(key, value)//要修改的字段 及字段值.endObject());UpdateResponse response= client.update(request,RequestOptions.DEFAULT);System.out.println(response.status());} catch (Exception e) {e.printStackTrace();}}
-
更新数据需要传入指定的索引库,需要修改的字段名称,修改后的value值,以及代表那条数据的唯一id
-
关于唯一id怎么获得,可以参考以下代码。
public void selectAndUpdate(){int count = 0;//step1:根据条件获取需要修改的数据的idSearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "name");// step2:遍历更新查询出来的数据for (SearchHit searchHit : searchHits) {// 获取到单条记录的idString id = searchHit.getId();//调用更新数据的核心方法esDeal.update(PRODUCT_DEV_INDEX,"name","张三",id);count++;}System.out.println("一共更新了"+count+"条数据");}public SearchHit[] selectIdByKey(String index,String key){BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();boolQueryBuilder.must(QueryBuilders.existsQuery(key));SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();sourceBuilder.query(boolQueryBuilder).trackTotalHits(true).size(22);SearchRequest searchRequest = new SearchRequest().indices(index).source(sourceBuilder);try {SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);SearchHit[] hits = searchResponse.getHits().getHits();return hits;} catch (IOException e) {e.printStackTrace();}return null;}
数据合并
数据合并其实也是更新的一种,关键在于怎么提取及封装需要更新的数据,根据业务场景的不同达到合并的效果,以上面的图片为例,我需要将以上数据的20004.value里面的值合并上新的数据却也保留着原有的数据。
@Testpublic void selectAndUpdate(){int count = 0;//step1:根据条件获取需要修改的数据的idSearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "20004.value");for (SearchHit searchHit : searchHits) {String id = searchHit.getId();// step2:组装数据Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();List<Map<String,Object>> mapList = (List<Map<String, Object>>) sourceAsMap.get("20004");Map<String, Object> mapData = mapList.get(0);// 获取之前20004.value的数据,并添加上新的数据List value = (List) mapData.get("value");value.add("120021");value.add("3990993");mapData.put("value",value);// step3:根据id更新数据esDeal.update(PRODUCT_DEV_INDEX,"20004",mapList,id);count++;}System.out.println("一共更新了"+count+"条数据");}
备注:
- 代码中调用的selectIdByKey()以及update()在更新数据的第一部分都有代码,直接使用就好。根据以上代码更新后的数据为:
- 在更新20004.value的值时,不需要保持和20004.value的原有数据类型一致,更新成其他类型也是可以的,这点和插入key:value的数据是有区别的。