文章目录
- 1. 版本
- 2. 连接es
- 3. 索引操作
- 3.1 创建索引
- 3.2 判断索引是否存在
- 3.3 更新索引
- 3.4 删除索引
- 3.5 数据迁移
- 3.6 设置别名
- 4.1 新增或覆盖数据(单条)
- 5.1 一个示例
- 6.1 变量
- 7.1 更新
1. 版本
示例运行环境:
go版本: 1.14
es版本: 6.7.0
工具及版本: github.com/olivere/elastic v6.2.35
此文章适合有go基础的人阅读, 因为代码示例中都是部分代码片段而不是完整可执行代码, 需要自行完善
2. 连接es
options := []elastic.ClientOptionFunc{elastic.SetURL("http://xxxxxxx:9200"),elastic.SetSniff(true), //是否开启集群嗅探elastic.SetHealthcheckInterval(10 * time.Second), //设置两次运行状况检查之间的间隔, 默认60selastic.SetGzip(false), //启用或禁用gzip压缩elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), //ERROR日志输出配置elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), //INFO级别日志输出配置}options = append(options, elastic.SetBasicAuth("xxxx", //账号"xxxxxxxxxxxxxx", //密码))con, err := elastic.NewClient(options...)
下述代码中 conf.ES() 表示con
3. 索引操作
3.1 创建索引
type mi = map[string]interface{}mapping := mi{"settings": mi{"number_of_shards": 3,"number_of_replicas": 2,},"mappings": mi{"_doc": mi{ //type名"properties": mi{"id": mi{ //整形字段, 允许精确匹配"type": "integer",},"name": mi{"type": "text", //字符串类型且进行分词, 允许模糊匹配"analyzer": "ik_smart", //设置分词工具"search_analyzer": "ik_smart","fields": mi{ //当需要对模糊匹配的字符串也允许进行精确匹配时假如此配置"keyword": mi{"type": "keyword","ignore_above": 256,},},},"date_field": mi{ //时间类型, 允许精确匹配"type": "date",},"keyword_field": mi{ //字符串类型, 允许精确匹配"type": "keyword",},"nested_field": mi{ //嵌套类型"type": "nested","properties": mi{"id": mi{"type": "integer",},"start_time": mi{ //长整型, 允许精确匹配"type": "long",},"end_time": mi{"type": "long",},},},},},},}indexName := "xxxxxxxxxxxxxxxxxxxxx" //要创建的索引名_, err = conf.ES().CreateIndex(indexName).BodyJson(mapping).Do(context.Background())
3.2 判断索引是否存在
//exists true 表示索引已存在exists, err := conf.ES().IndexExists(indexName).Do(context.Background())
3.3 更新索引
仅支持添加字段, 已有字段无法修改
type mi = map[string]interface{}mapping := mi{"properties": mi{"id": mi{ //整形字段, 允许精确匹配"type": "integer",},},}_, err = conf.ES().PutMapping().Index(indexName).Type("_doc").BodyJson(mapping).Do(context.Background())
3.4 删除索引
_, err = conf.ES().DeleteIndex(indexName).Do(context.Background())
3.5 数据迁移
将一个索引的数据迁移到另一个索引中, 一般用于索引结构发生改变时使用新索引存储数据
type mi = map[string]interface{}_, err = conf.ES().Reindex().Body(mi{"source": mi{"index": oldIndexName,},"dest": mi{"index": newIndexName,},}).Do(context.Background())
3.6 设置别名
设置别名后就可以使用别名查询数据
_, err = conf.ES().Alias().Action(elastic.NewAliasAddAction(oldIndexName).Index(newIndexName),).Do(context.Background())
4. 数据操作
4.1 新增或覆盖数据(单条)
此操作相同id的数据会被覆盖
_, err = conf.ES().Index().Index(indexName).Type("_doc").// id为字符串, 创建一条此id的数据或覆盖已有此id的记录// data为结构体或map, 当然结构需要跟索引的mapping类型保持一致Id(id).BodyJson(data).Do(context.Background())
4.2 根据id新增或更新数据(单条)
仅更新传入的字段, 而不是像 4.1 进行整条记录覆盖
_, err = conf.ES().Update().Index(t.index()).Type("_doc").Id(id).// data为结构体或map, 需注意的是如果使用结构体零值也会去更新原记录Upsert(data).// true 无则插入, 有则更新, 设置为false时记录不存在将报错DocAsUpsert(true).Do(context.Background())
4.3 根据id新增或更新数据(批量)
bulkRequest := conf.ES().Bulk()// data map[int]interface{}, key为id, value为要更新的数据for id, v := range data {doc := elastic.NewBulkUpdateRequest().Index(t.index()).Type("_doc").Id(strconv.Itoa(id)).Doc(v).// true 无则插入, 有则更新, 设置为false时记录不存在将报错DocAsUpsert(true)bulkRequest.Add(doc)}bulkResponse, err := bulkRequest.Do(context.Background())if err != nil {return}// 获取操作失败的记录bad := bulkResponse.Failed()if len(bad) > 0 {s, _ := jsoniter.MarshalToString(bad)err = errors.New("部分记录更新失败 " + s)}
4.4 根据条件更新数据
_, err = conf.ES().UpdateByQuery().Index(indexName).Type("_doc").//查询条件, 详细配置查询条件请查看章节 5Query(query).//要执行的更新操作, 详细配置请查看章节 6及7.1Script(script).Do(context.Background())
4.5 查询
_, err = conf.ES().Search().Index(indexName).//偏移量From(0).//返回数据的条数Size(10).//指定返回数据的字段(此处指定返回id和name), 全部返回则无需设置FetchSourceContext(elastic.NewFetchSourceContext(true).Include("id", "name")).//查询条件, 详细配置查询条件请查看章节 5Query(query).//按照id升序排序, 无需排序则可跳过此设置, 多个Sort会按先后顺序依次生效Sort("id", true).//自定义排序规则, 详细写法请查看章节 6及7.2SortBy(sorter).Do(context.Background())
5. 查询条件query设置
5.1 一个示例
{"bool": {"filter": [{"nested": {"path": "nested_field","query": {"range": {"nested_field.start_time": {"from": 1581475200,"include_lower": true,"include_upper": true,"to": null}}}}},{"nested": {"path": "nested_field","query": {"range": {"nested_field.end_time": {"from": null,"include_lower": true,"include_upper": true,"to": 1581481440}}}}}],"must": {"terms": {"id": [4181,4175]}}}}
实现上述查询条件的go代码如下
query := elastic.NewBoolQuery()query.Must(elastic.NewTermsQuery("id", []int{4181, 4175}))query.Filter(elastic.NewNestedQuery("nested_field",// nested_field.start_time >= 1581475200elastic.NewRangeQuery("nested_field.start_time").Gte(1581475200),))query.Filter(elastic.NewNestedQuery("nested_field",// nested_field.start_time <= 1581481440elastic.NewRangeQuery("nested_field.end_time").Lte(1581481440),))
5.2 match 模糊匹配
// name字段模糊匹配elastic.NewMatchQuery("name", val)
5.3 terms 精确匹配
// name字段精确匹配elastic.NewTermsQuery("name.keyword", val...)
5.4 range 范围匹配
// id >= 10, id <= 100elastic.NewRangeQuery("id").Gte(10).Lte(100)
5.5 nested 嵌套结构查询
elastic.NewNestedQuery("nested_field",query, //此处query中的字段 都需要加上nested_field前缀, 比如 nested_field.id)
5.6 更多
es query中的每一种查询结构都会有一个对应的go结构体, 此处不再过多赘述
6. script painless语法
官方文档
6.1 变量
int index = 1;
6.2 if else
if (index == -1) {// 逻辑} else {//}
6.3 for循环
for (om_cms_id in params.om_cms_ids) {continue}
6.4 类型转换
// 数字转字符串String.valueOf(11)
7. script 实际应用
7.1 更新
数据库中goods_id与omcms的关联关系存储如下
om_cms_id | goods_id |
---|---|
1 | 1, 2 |
2 | 1, 2 |
3 | 1 |
4 | 4 |
es中goods_id与omcms关联关系存储如下
goods_id | om_cms_id |
---|---|
1 | [1, 2, 3] |
2 | [1, 2, 4] |
当数据库中关联关系发生变换时要对应更新es中数据
- 获取omcms发生变化的记录
- 删除这些omcms在es中与goods的关联关系
- 添加omcms与goods的新关联关系
var goodsOmCmsIds = make(map[int][]int)var goodsIds []interface{}var omCmsIds []interface{}for _, v := range list {omCmsIds = append(omCmsIds, v.Id)reg := regexp.MustCompile("^\\\\d+(,\\\\d+)*$")v.Ids = strings.ReplaceAll(v.Ids, "NaN", "0")if !reg.MatchString(v.Ids) {fmt.Println("不支持的格式:", v.Ids)continue}err = jsoniter.UnmarshalFromString("["+v.Ids+"]", &v.GoodsIds)if err != nil {return}for _, goodsId := range v.GoodsIds {if goodsId == 0 {continue}if _, ok := goodsOmCmsIds[goodsId]; !ok {goodsIds = append(goodsIds, goodsId)}goodsOmCmsIds[goodsId] = append(goodsOmCmsIds[goodsId], v.Id)}}// 查询出omcsm变更导致可能发生变更的所有商品query := NewBoolQuery().Should(NewBoolQuery().Must(elastic.NewTermsQuery("id", goodsIds...)),NewBoolQuery().Must(elastic.NewTermsQuery("om_cms_id", omCmsIds...)),)script := elastic.NewScript(`for (om_cms_id in params.om_cms_ids) {if (ctx._source.om_cms_id == null) {continue}int index = ctx._source.om_cms_id.indexOf(om_cms_id);if (index == -1) {continue}ctx._source.om_cms_id.remove(index);}if (params.goods_om_cms_ids == null || params.goods_om_cms_ids[ctx._id] == null) {return}for (om_cms_id in params.goods_om_cms_ids[ctx._id]) {if (ctx._source.om_cms_id == null) {ctx._source.om_cms_id = [om_cms_id]} else {ctx._source.om_cms_id.add(om_cms_id)}}`).Params(map[string]interface{}{"goods_om_cms_ids": goodsOmCmsIds,"goods_ids": goodsIds,"om_cms_ids": omCmsIds,})// 使用4.4更行方法
7.2 自定义排序
根据传入的id顺序排序
m := make(map[string]interface{})// 循环要查询的id, 并转成map, key是id值, value是在原数组种的索引for index, goodsId := range ids {m[strconv.Itoa(goodsId)] = index}sort := elastic.NewScriptSort(// params 存储自定义数据// doc 表示当前记录elastic.NewScript(`params.id[String.valueOf(doc['id'].value)]`).Param("id", m),"number",).Order(vv[0] == 1)