Browse Source

add elastic query delete update

kerryzhang 5 years ago
parent
commit
1774d93e06

+ 119 - 0
common/src/main/java/com/vcarecity/elastic/util/SnowFlake.java View File

@@ -0,0 +1,119 @@
1
+package com.vcarecity.elastic.util;
2
+
3
+/**
4
+ * @author VcKerry
5
+ */
6
+public class SnowFlake {
7
+
8
+    /**
9
+     * 起始的时间戳:这个时间戳自己随意获取,比如自己代码的时间戳
10
+     */
11
+    private final static long START_STMP = 1543903501000L;
12
+
13
+    /**
14
+     * 每一部分占用的位数
15
+     */
16
+    private final static long SEQUENCE_BIT = 12; //序列号占用的位数
17
+    private final static long MACHINE_BIT = 5;  //机器标识占用的位数
18
+    private final static long DATACENTER_BIT = 5;//数据中心占用的位数
19
+
20
+    /**
21
+     * 每一部分的最大值:先进行左移运算,再同-1进行异或运算;异或:相同位置相同结果为0,不同结果为1
22
+     */
23
+    /**
24
+     * 用位运算计算出最大支持的数据中心数量:31
25
+     */
26
+    private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
27
+
28
+    /**
29
+     * 用位运算计算出最大支持的机器数量:31
30
+     */
31
+    private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
32
+
33
+    /**
34
+     * 用位运算计算出12位能存储的最大正整数:4095
35
+     */
36
+    private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
37
+
38
+    /**
39
+     * 每一部分向左的位移
40
+     */
41
+
42
+    /**
43
+     * 机器标志较序列号的偏移量
44
+     */
45
+    private final static long MACHINE_LEFT = SEQUENCE_BIT;
46
+
47
+    /**
48
+     * 数据中心较机器标志的偏移量
49
+     */
50
+    private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
51
+
52
+    /**
53
+     * 时间戳较数据中心的偏移量
54
+     */
55
+    private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
56
+
57
+    private static long datacenterId;  //数据中心
58
+    private static long machineId;    //机器标识
59
+    private static long sequence = 0L; //序列号
60
+    private static long lastStmp = -1L;//上一次时间戳
61
+
62
+    /**
63
+     * 此处无参构造私有,同时没有给出有参构造,在于避免以下两点问题:
64
+     * 1、私有化避免了通过new的方式进行调用,主要是解决了在for循环中通过new的方式调用产生的id不一定唯一问题问题,因为用于			 记录上一次时间戳的lastStmp永远无法得到比对;
65
+     * 2、没有给出有参构造在第一点的基础上考虑了一套分布式系统产生的唯一序列号应该是基于相同的参数
66
+     */
67
+    private SnowFlake() {
68
+    }
69
+
70
+    /**
71
+     * 产生下一个ID
72
+     *
73
+     * @return
74
+     */
75
+    public static synchronized long nextId() {
76
+        /** 获取当前时间戳 */
77
+        long currStmp = getNewstmp();
78
+
79
+        /** 如果当前时间戳小于上次时间戳则抛出异常 */
80
+        if (currStmp < lastStmp) {
81
+            throw new RuntimeException("Clock moved backwards.  Refusing to generate id");
82
+        }
83
+        /** 相同毫秒内 */
84
+        if (currStmp == lastStmp) {
85
+            //相同毫秒内,序列号自增
86
+            sequence = (sequence + 1) & MAX_SEQUENCE;
87
+            //同一毫秒的序列数已经达到最大
88
+            if (sequence == 0L) {
89
+
90
+                /** 获取下一时间的时间戳并赋值给当前时间戳 */
91
+                currStmp = getNextMill();
92
+            }
93
+        } else {
94
+            //不同毫秒内,序列号置为0
95
+            sequence = 0L;
96
+        }
97
+        /** 当前时间戳存档记录,用于下次产生id时对比是否为相同时间戳 */
98
+        lastStmp = currStmp;
99
+
100
+
101
+        return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
102
+                | datacenterId << DATACENTER_LEFT      //数据中心部分
103
+                | machineId << MACHINE_LEFT            //机器标识部分
104
+                | sequence;                            //序列号部分
105
+    }
106
+
107
+    private static long getNextMill() {
108
+        long mill = getNewstmp();
109
+        while (mill <= lastStmp) {
110
+            mill = getNewstmp();
111
+        }
112
+        return mill;
113
+    }
114
+
115
+    private static long getNewstmp() {
116
+        return System.currentTimeMillis();
117
+    }
118
+
119
+}

+ 14 - 0
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/ElasticIndexService.java View File

@@ -0,0 +1,14 @@
1
+package com.vcarecity.publish.elastic.service;
2
+
3
+/**
4
+ * @author kerryzhang on 2019/12/17
5
+ */
6
+
7
+public interface ElasticIndexService {
8
+
9
+    void createIndex(String ...indices);
10
+
11
+    void deleteIndex(String... indices);
12
+
13
+
14
+}

+ 0 - 3
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/ElasticQueryService.java View File

@@ -1,7 +1,5 @@
1 1
 package com.vcarecity.publish.elastic.service;
2 2
 
3
-import java.util.List;
4
-
5 3
 /**
6 4
  * @author VcKerry on 12/17/19
7 5
  */
@@ -9,5 +7,4 @@ import java.util.List;
9 7
 public interface ElasticQueryService {
10 8
 
11 9
 
12
-    List<Long> queryById();
13 10
 }

+ 21 - 3
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/ElasticUpdateService.java View File

@@ -13,10 +13,28 @@ public interface ElasticUpdateService {
13 13
 
14 14
     void updateTest() throws IOException;
15 15
 
16
-    void updateUnit(List<UnitAgencyMergeEntity> list) throws IOException;
17 16
 
18
-
19
-    void updateAgency(List<UnitAgencyMergeEntity> list) throws IOException;
17
+    void handlerDataChange(String key, List<UnitAgencyMergeEntity> data) throws IOException;
18
+
19
+    /**
20
+     * 添加数据
21
+     *
22
+     * @param data
23
+     * @throws IOException
24
+     */
25
+    void addElasticData(List<UnitAgencyMergeEntity> data) throws IOException;
26
+
27
+    void deleteByQuery(String key, List<Long> ids) throws IOException;
28
+
29
+    /**
30
+     * update unit/agency... info,return incr data list
31
+     *
32
+     * @param key
33
+     * @param data
34
+     * @return
35
+     * @throws IOException
36
+     */
37
+    List<UnitAgencyMergeEntity> updateDataByQuery(String key, List<UnitAgencyMergeEntity> data) throws IOException;
20 38
 
21 39
 
22 40
 }

+ 23 - 0
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/impl/ElasticIndexServiceImpl.java View File

@@ -0,0 +1,23 @@
1
+package com.vcarecity.publish.elastic.service.impl;
2
+
3
+import com.vcarecity.publish.elastic.service.ElasticIndexService;
4
+import org.springframework.stereotype.Service;
5
+
6
+/**
7
+ * @author kerryzhang on 2019/12/17
8
+ */
9
+
10
+@Service
11
+public class ElasticIndexServiceImpl implements ElasticIndexService {
12
+
13
+
14
+    @Override
15
+    public void createIndex(String... indices) {
16
+
17
+    }
18
+
19
+    @Override
20
+    public void deleteIndex(String... indices) {
21
+
22
+    }
23
+}

+ 1 - 4
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/impl/ElasticQueryServiceImpl.java View File

@@ -11,8 +11,5 @@ import java.util.List;
11 11
 
12 12
 @Service
13 13
 public class ElasticQueryServiceImpl implements ElasticQueryService {
14
-    @Override
15
-    public List<Long> queryById() {
16
-        return null;
17
-    }
14
+
18 15
 }

+ 113 - 48
elastic-publish-service/src/main/java/com/vcarecity/publish/elastic/service/impl/ElasticUpdateServiceImpl.java View File

@@ -2,16 +2,26 @@ package com.vcarecity.publish.elastic.service.impl;
2 2
 
3 3
 import com.fasterxml.jackson.core.type.TypeReference;
4 4
 import com.fasterxml.jackson.databind.ObjectMapper;
5
-import com.vcarecity.publish.entity.UnitAgencyMergeEntity;
5
+import com.vcarecity.elastic.util.AgencyPathUtil;
6
+import com.vcarecity.elastic.util.SnowFlake;
7
+import com.vcarecity.publish.elastic.entity.UnitAgencyDetailEntity;
6 8
 import com.vcarecity.publish.elastic.service.ElasticUpdateService;
9
+import com.vcarecity.publish.entity.UnitAgencyMergeEntity;
10
+import com.vcarecity.publish.sql.service.AgencyService;
11
+import lombok.extern.slf4j.Slf4j;
12
+import org.elasticsearch.action.bulk.BulkRequest;
13
+import org.elasticsearch.action.bulk.BulkResponse;
14
+import org.elasticsearch.action.index.IndexRequest;
7 15
 import org.elasticsearch.client.RequestOptions;
8 16
 import org.elasticsearch.client.RestHighLevelClient;
17
+import org.elasticsearch.common.xcontent.XContentType;
9 18
 import org.elasticsearch.index.query.QueryBuilders;
10 19
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
11 20
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
12 21
 import org.elasticsearch.index.reindex.UpdateByQueryRequest;
13 22
 import org.elasticsearch.script.Script;
14 23
 import org.elasticsearch.script.ScriptType;
24
+import org.springframework.cglib.beans.BeanCopier;
15 25
 import org.springframework.stereotype.Service;
16 26
 
17 27
 import java.io.IOException;
@@ -25,6 +35,7 @@ import java.util.Map;
25 35
  * @author Kerry on 12/17/19
26 36
  */
27 37
 
38
+@Slf4j
28 39
 @Service
29 40
 public class ElasticUpdateServiceImpl implements ElasticUpdateService {
30 41
 
@@ -32,11 +43,15 @@ public class ElasticUpdateServiceImpl implements ElasticUpdateService {
32 43
 
33 44
     private final RestHighLevelClient restHighLevelClient;
34 45
     private final ObjectMapper objectMapper;
46
+    private final AgencyService agencyService;
35 47
 
36 48
     public ElasticUpdateServiceImpl(RestHighLevelClient restHighLevelClient,
37
-                                    ObjectMapper objectMapper) {
49
+                                    ObjectMapper objectMapper,
50
+                                    AgencyService agencyService) {
38 51
         this.restHighLevelClient = restHighLevelClient;
39 52
         this.objectMapper = objectMapper;
53
+        this.agencyService = agencyService;
54
+
40 55
     }
41 56
 
42 57
 
@@ -62,85 +77,135 @@ public class ElasticUpdateServiceImpl implements ElasticUpdateService {
62 77
 
63 78
     }
64 79
 
65
-
66
-    private String getPainlessCode(Map<String, Object> params) {
67
-        String[] items = new String[params.size()];
68
-        int index = 0;
69
-        for (String key : params.keySet()) {
70
-            items[index++] = "ctx._source." + key + "=params." + key;
71
-        }
72
-        return String.join(";", items);
73
-    }
74
-
75
-
76 80
     @Override
77
-    public void updateUnit(List<UnitAgencyMergeEntity> list) throws IOException {
81
+    public void handlerDataChange(String key, List<UnitAgencyMergeEntity> data) throws IOException {
78 82
         List<UnitAgencyMergeEntity> updateList = new ArrayList<>();
79 83
         List<Long> deleteId = new ArrayList<>();
80
-        for (UnitAgencyMergeEntity uae : list) {
84
+        for (UnitAgencyMergeEntity uae : data) {
81 85
             if (uae.getIsDeleted() == 1) {
82 86
                 deleteId.add(uae.getUnitId());
83 87
             } else if (uae.getIsDeleted() == 0) {
84 88
                 updateList.add(uae);
85 89
             }
86 90
         }
87
-        deleteById("unitId", deleteId);
91
+        // 先删除
92
+        deleteByQuery(key, deleteId);
88 93
         if (updateList.isEmpty()) {
89 94
             return;
90 95
         }
91
-        for (UnitAgencyMergeEntity entity : updateList) {
96
+        // 更新数据
97
+        List<UnitAgencyMergeEntity> addData = updateDataByQuery(key, updateList);
92 98
 
93
-            Map<String, Object> map = objectMapper.convertValue(entity, new TypeReference<Map<String, Object>>() {
99
+        // 添加数据
100
+        addElasticData(addData);
101
+    }
102
+
103
+
104
+    @Override
105
+    public void deleteByQuery(String key, List<Long> ids) throws IOException {
106
+        if (ids == null || ids.isEmpty()) {
107
+            return;
108
+        }
109
+        for (Long id : ids) {
110
+            DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX);
111
+            request.setQuery(QueryBuilders.termQuery(key, id));
112
+            restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
113
+        }
114
+    }
115
+
116
+    @Override
117
+    public List<UnitAgencyMergeEntity> updateDataByQuery(String key, List<UnitAgencyMergeEntity> data) throws IOException {
118
+        List<UnitAgencyMergeEntity> addList = new ArrayList<>();
119
+
120
+        for (UnitAgencyMergeEntity item : data) {
121
+
122
+            Map<String, Object> params = objectMapper.convertValue(item, new TypeReference<Map<String, Object>>() {
94 123
                 @Override
95 124
                 public Type getType() {
96 125
                     return super.getType();
97 126
                 }
98 127
             });
99 128
 
100
-            map.remove("unitId");
101
-
102 129
             UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX);
103
-            request.setQuery(QueryBuilders.termQuery("unitId", entity.getUnitId()));
130
+            request.setQuery(QueryBuilders.termQuery(key, params.get(key)));
131
+
132
+            request.setScript(new Script(ScriptType.INLINE, "painless", getPainlessCode(params), params));
133
+
134
+            BulkByScrollResponse response = restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
135
+
136
+            logger.info("response:{}", response);
104 137
 
138
+            if (response.getUpdated() != 0L) {
139
+                addList.add(item);
140
+            }
105 141
         }
142
+
143
+        return addList;
106 144
     }
107 145
 
108 146
     @Override
109
-    public void updateAgency(List<UnitAgencyMergeEntity> list) throws IOException {
110
-        List<UnitAgencyMergeEntity> updateList = new ArrayList<>();
111
-        List<Long> deleteId = new ArrayList<>();
112
-        for (UnitAgencyMergeEntity uae : list) {
113
-            if (uae.getIsDeleted() == 1) {
114
-                deleteId.add(uae.getAgencyId());
115
-            } else if (uae.getIsDeleted() == 0) {
116
-                updateList.add(uae);
117
-            }
147
+    public void addElasticData(List<UnitAgencyMergeEntity> data) throws IOException {
148
+        if (data.isEmpty()) {
149
+            return;
118 150
         }
119
-        deleteById("agencyId", deleteId);
120
-        list.clear();
121
-        updateElasticData("agencyId", updateList);
122
-    }
151
+        final BeanCopier beanCopier = BeanCopier.create(UnitAgencyMergeEntity.class, UnitAgencyDetailEntity.class, false);
152
+
153
+        List<Map<String, Object>> saveResult = new ArrayList<>(data.size());
154
+
155
+        for (UnitAgencyMergeEntity item : data) {
156
+            List<Integer> agencyIds = AgencyPathUtil.splitAgencyId(item.getAgencyPath());
157
+            List<String> agencyPathDetail = agencyService.getAgencyPathDetail(agencyIds);
123 158
 
159
+            UnitAgencyDetailEntity target = new UnitAgencyDetailEntity();
160
+            beanCopier.copy(item, target, null);
124 161
 
125
-    private void updateElasticData(String key, List<UnitAgencyMergeEntity> list) throws IOException {
126
-        if (list == null || list.isEmpty()) {
162
+            target.setAgencyDetail(String.join("", agencyPathDetail));
163
+
164
+            Map<String, Object> map = objectMapper.convertValue(target, new TypeReference<Map<String, Object>>() {
165
+                @Override
166
+                public Type getType() {
167
+                    return super.getType();
168
+                }
169
+            });
170
+            final Map<String, Integer> queryMap = AgencyPathUtil.agencyPathQueryMap(agencyIds);
171
+            map.putAll(queryMap);
172
+
173
+            saveResult.add(map);
174
+
175
+        }
176
+        if (saveResult.isEmpty()) {
127 177
             return;
128 178
         }
129
-        for (UnitAgencyMergeEntity item : list) {
130
-            UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX);
131
-            request.setQuery(QueryBuilders.termQuery(key, item.getAgencyId()));
179
+        try {
180
+            BulkRequest bulkRequest = new BulkRequest();
181
+
182
+            for (Map<String, Object> val : saveResult) {
183
+                IndexRequest indexRequest = createIndexRequest(INDEX, val);
184
+                bulkRequest.add(indexRequest);
185
+            }
186
+            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
187
+            logger.info("bulk insert status {}", bulkResponse.status());
188
+        } catch (IOException e) {
189
+            e.printStackTrace();
132 190
         }
191
+
133 192
     }
134 193
 
135
-    private void deleteById(String key, List<Long> deleteId) throws IOException {
136
-        if (deleteId == null || deleteId.isEmpty()) {
137
-            return;
138
-        }
139
-        for (Long id : deleteId) {
140
-            DeleteByQueryRequest request = new DeleteByQueryRequest(INDEX);
141
-            request.setQuery(QueryBuilders.termQuery(key, id));
142
-            restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
143
-        }
194
+
195
+    private IndexRequest createIndexRequest(String index, Map<String, Object> value) {
196
+        IndexRequest indexRequest = new IndexRequest(index);
197
+        indexRequest.id(SnowFlake.nextId() + "");
198
+        indexRequest.source(value, XContentType.JSON);
199
+        return indexRequest;
144 200
     }
145 201
 
202
+
203
+    private String getPainlessCode(Map<String, Object> params) {
204
+        String[] items = new String[params.size()];
205
+        int index = 0;
206
+        for (String key : params.keySet()) {
207
+            items[index++] = "ctx._source." + key + "=params." + key;
208
+        }
209
+        return String.join(";", items);
210
+    }
146 211
 }