ElasticSearchDao.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. package com.pcitc.imp.bizlog.dal.dao.base;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import org.apache.http.HttpEntity;
  9. import org.apache.http.StatusLine;
  10. import org.apache.http.entity.ContentType;
  11. import org.apache.http.nio.entity.NStringEntity;
  12. import org.apache.http.util.EntityUtils;
  13. import org.elasticsearch.client.Response;
  14. import org.elasticsearch.client.RestClient;
  15. import org.springframework.stereotype.Repository;
  16. import com.pcitc.imp.bizlog.dal.dao.impl.Param;
  17. import com.pcitc.imp.bizlog.dal.pojo.App;
  18. import com.pcitc.imp.bizlog.dal.pojo.Log;
  19. import com.pcitc.imp.bizlog.exception.BusiException;
  20. import com.pcitc.imp.bizlog.service.model.Condition;
  21. import com.pcitc.imp.bizlog.util.CheckPrompt;
  22. import com.pcitc.imp.bizlog.util.CheckUtil;
  23. import com.pcitc.imp.bizlog.util.ClientFactory;
  24. import com.pcitc.imp.bizlog.util.ErrorCodeEnum;
  25. import io.vertx.core.json.Json;
  26. import io.vertx.core.json.JsonArray;
  27. import io.vertx.core.json.JsonObject;
  28. /**
  29. * 用ES实现的持久层
  30. *
  31. * @author haiwen.wang
  32. */
  33. @Repository
  34. public class ElasticSearchDao {
  35. public ClientFactory getClientFactory() {
  36. return ClientFactory.getInstance();
  37. }
  38. /**
  39. * 创建appregister索引
  40. *
  41. * @throws BusiException
  42. */
  43. public void creatIndex(String url, String body) throws BusiException {
  44. RestClient client = null;
  45. try {
  46. client = getClientFactory().getClient();
  47. Map<String, String> params = new HashMap<String, String>();
  48. HttpEntity entity = new NStringEntity(body, ContentType.APPLICATION_JSON);
  49. client.performRequest("PUT", url, params, entity);
  50. } catch (Exception e) {
  51. throw new BusiException(ErrorCodeEnum.M001, e.getMessage(), e);
  52. }
  53. }
  54. /**
  55. * @param indexName
  56. * @return
  57. * @throws IOException
  58. */
  59. @SuppressWarnings({ "unchecked", "rawtypes" })
  60. public List<String> queryAll(String indexName) throws IOException {
  61. RestClient client = getClientFactory().getClient();
  62. String url = "/" + indexName + "/_search";
  63. Map params = new HashMap<>();
  64. params.put("size", String.valueOf(getClientFactory().getConfig().getInteger("buzilog.size")));
  65. List<String> result = performGet(client, url, params, null);
  66. return result;
  67. }
  68. /**
  69. * 从GET方法的响应中取到数据
  70. * @param client
  71. * @param url
  72. * @param params
  73. * @param entity
  74. * @return
  75. * @throws IOException
  76. */
  77. @SuppressWarnings({ "unchecked", "rawtypes" })
  78. private List<String> performGet(RestClient client, String url, Map params, HttpEntity entity) throws IOException {
  79. List<String> result = new ArrayList<String>();
  80. Response response;
  81. if (entity == null) {
  82. response = client.performRequest("GET", url, params);
  83. } else {
  84. response = client.performRequest("GET", url, params, entity);
  85. }
  86. String retStr = EntityUtils.toString(response.getEntity());
  87. JsonObject retJson = new JsonObject(retStr);
  88. Integer count = retJson.getJsonObject("hits").getInteger("total");
  89. JsonArray hits = retJson.getJsonObject("hits").getJsonArray("hits");
  90. if (hits == null || hits.isEmpty()) {
  91. return result;
  92. }
  93. result.add(count.toString());
  94. for (int i = 0; i < hits.size(); i++) {
  95. result.add(hits.getJsonObject(i).getJsonObject("_source").toString());
  96. }
  97. return result;
  98. }
  99. @SuppressWarnings({ "unchecked", "rawtypes" })
  100. private List<String> performGetUpdate(RestClient client, String url, Map params, HttpEntity entity) throws IOException {
  101. List<String> result = new ArrayList<String>();
  102. Response response;
  103. if (entity == null) {
  104. response = client.performRequest("GET", url, params);
  105. } else {
  106. response = client.performRequest("GET", url, params, entity);
  107. }
  108. String retStr = EntityUtils.toString(response.getEntity());
  109. JsonObject retJson = new JsonObject(retStr);
  110. JsonArray hits = retJson.getJsonObject("hits").getJsonArray("hits");
  111. if (hits == null || hits.isEmpty()) {
  112. return result;
  113. }
  114. for (int i = 0; i < hits.size(); i++) {
  115. result.add(hits.getJsonObject(i).getJsonObject("_source").toString());
  116. result.add(hits.getJsonObject(i).getString("_id"));
  117. }
  118. return result;
  119. }
  120. /**
  121. * 根据条件查询数据
  122. * @param indexName
  123. * @param paramMap
  124. * @param skip
  125. * @param top
  126. * @return
  127. */
  128. public List<String> queryByCon(String indexName, Map<String, Param> paramMap, String skip, String top, Condition condition)
  129. throws BusiException {
  130. RestClient client = getClientFactory().getClient();
  131. String url = "/" + indexName + "/_search";
  132. Map<String,String> params = new HashMap<String,String>();
  133. if (!CheckUtil.checkStringIsNull(skip)) {
  134. params.put("from", skip);
  135. }
  136. if (!CheckUtil.checkStringIsNull(top)) {
  137. params.put("size", top);
  138. }
  139. JsonArray should = new JsonArray();
  140. JsonArray must = new JsonArray();
  141. JsonArray filter = new JsonArray();
  142. JsonObject range = new JsonObject();
  143. JsonObject item_p = new JsonObject();
  144. Set<String> keys = paramMap.keySet();
  145. String keyV = null;
  146. for ( String key : keys) {
  147. keyV = key;
  148. Param param = paramMap.get(key);
  149. if(key != "timestamp"){
  150. if (param.getQueryType().equals("should")) {
  151. setParam(should, key, param);
  152. } else if (param.getQueryType().equals("must")) {
  153. setParam(must, key, param);
  154. } else if (param.getQueryType().equals("filter")) {
  155. setParam(filter, key, param);
  156. }
  157. }else{
  158. param.getMatchType().equals("range");
  159. JsonObject item = new JsonObject();
  160. item.put("gte", condition.getStartTime());
  161. item.put("lte", condition.getEndTime());
  162. range.put("timestamp", item);
  163. item_p.put(param.getMatchType(), range);
  164. }
  165. }
  166. JsonObject query = new JsonObject();
  167. if(keyV != "timestamp"){
  168. JsonObject con_p = new JsonObject();
  169. con_p.put("must", must);
  170. con_p.put("should", should);
  171. con_p.put("filter", filter);
  172. JsonObject bool = new JsonObject();
  173. bool.put("bool", con_p);
  174. query.put("query", bool);
  175. }else{
  176. query.put("query", item_p);
  177. }
  178. HttpEntity entity = new NStringEntity(query.toString(), ContentType.APPLICATION_JSON);
  179. List<String> result = null;
  180. try {
  181. result = performGet(client, url, params, entity);
  182. } catch (IOException e) {
  183. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.LOG_NOT_EXIST, e);
  184. } catch (Exception e) {
  185. throw new BusiException(ErrorCodeEnum.M001, e.getMessage(), e);
  186. }
  187. return result;
  188. }
  189. @SuppressWarnings("rawtypes")
  190. private void setParam(JsonArray queryJson, String key, Param param) {
  191. List values = param.getValue();
  192. for (int i = 0; i < values.size(); i++) {
  193. JsonObject item = new JsonObject();
  194. item.put(key, values.get(i));
  195. JsonObject item_p = new JsonObject();
  196. item_p.put(param.getMatchType(), item);
  197. queryJson.add(item_p);
  198. }
  199. }
  200. /**
  201. * 根据某个字段查询数据
  202. */
  203. @SuppressWarnings({ "unchecked", "rawtypes" })
  204. public List<String> queryByField(String indexName, String fieldName, String value) throws BusiException {
  205. try {
  206. RestClient client = getClientFactory().getClient();
  207. String url = "/" + indexName + "/_search";
  208. Map params = new HashMap<>();
  209. params.put("q", fieldName + ":" + value);
  210. List<String> result = performGet(client, url, params, null);
  211. return result;
  212. } catch (Exception e) {
  213. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.QUERY + ":" + e.getMessage(), e);
  214. }
  215. }
  216. @SuppressWarnings({ "unchecked", "rawtypes" })
  217. public List<String> queryByFieldUpdate(String indexName, String fieldName, String value) throws BusiException {
  218. try {
  219. RestClient client = getClientFactory().getClient();
  220. String url = "/" + indexName + "/_search";
  221. Map params = new HashMap<>();
  222. params.put("q", fieldName + ":" + value);
  223. List<String> result = performGetUpdate(client, url, params, null);
  224. return result;
  225. } catch (Exception e) {
  226. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.QUERY + ":" + e.getMessage(), e);
  227. }
  228. }
  229. /**
  230. * @return void 返回类型
  231. * @throws BusiException
  232. * @Title: insert
  233. * @Description: 添加
  234. */
  235. @SuppressWarnings("unused")
  236. public <E> void insert(String tableName, String typeName, List<E> pojo) throws BusiException {
  237. try {
  238. RestClient client = getClientFactory().getClient();
  239. StringBuilder bodyStr = new StringBuilder();
  240. bodyStr.append("{ \"index\" : { \"_index\" : \"" + tableName + "\", \"_type\" : \"" + typeName + "\"} }"+"\r\n");
  241. for (int i = 0; i < pojo.size(); i++) {
  242. bodyStr.append(Json.encode(pojo.get(i))+"\r\n");
  243. }
  244. Map<String, String> params = new HashMap<String, String>();
  245. HttpEntity entity = new NStringEntity(bodyStr.toString(), ContentType.APPLICATION_JSON);
  246. Response indexResponse = client.performRequest("POST", "/_bulk",params,entity);
  247. } catch (IOException e) {
  248. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.INSERT + ":" + e.getMessage(), e);
  249. }
  250. }
  251. /**
  252. * @Title: insertApp
  253. * @Description: 添加应用
  254. * @return void 返回类型
  255. * @throws BusiException
  256. */
  257. @SuppressWarnings("unused")
  258. public int insertApp(String tableName, String typeName, List<App> pojo) throws BusiException {
  259. RestClient client = getClientFactory().getClient();
  260. int i = 0;
  261. try {
  262. List<String> lists = new ArrayList<>();
  263. StringBuilder bodyStr = new StringBuilder();
  264. Map<String, String> params = new HashMap<String, String>();
  265. for (App app : pojo) {
  266. bodyStr.append("{ \"index\" : { \"_index\" : \"" + tableName + "\", \"_type\" : \"" + typeName + "\"} }"
  267. + "\r\n");
  268. bodyStr.append(Json.encode(app) + "\r\n");
  269. i++;
  270. }
  271. HttpEntity entity = new NStringEntity(bodyStr.toString(), ContentType.APPLICATION_JSON);
  272. Response indexResponse = client.performRequest("POST", "/_bulk", params, entity);
  273. } catch (Exception e) {
  274. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.INSERT + ":" + e.getMessage(), e);
  275. }
  276. return i;
  277. }
  278. /**
  279. * @Title: insertLog
  280. * @Description: 添加日志
  281. * @return void 返回类型
  282. * @throws BusiException
  283. */
  284. @SuppressWarnings("unused")
  285. public int insertLog(String tableName, String typeName, List<Log> pojoList) throws BusiException {
  286. RestClient client = getClientFactory().getClient();
  287. int i = 0 ;
  288. try {
  289. List<String> lists = new ArrayList<>();
  290. Map<String, String> params = new HashMap<String, String>();
  291. StringBuilder bodyStr = new StringBuilder();
  292. for (Log log : pojoList) {
  293. bodyStr.append("{ \"index\" : { \"_index\" : \"" + tableName + "\", \"_type\" : \"" + typeName + "\"} }"
  294. + "\r\n");
  295. bodyStr.append(Json.encode(log) + "\r\n");
  296. i++;
  297. }
  298. HttpEntity entity = new NStringEntity(bodyStr.toString(), ContentType.APPLICATION_JSON);
  299. Response indexResponse = client.performRequest("POST", "/_bulk",params,entity);
  300. } catch (Exception e) {
  301. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.INSERT + ":" + e.getMessage(), e);
  302. }
  303. return i;
  304. }
  305. /**
  306. * @Title:
  307. * @Description: 操作记录-删除-根据条件删除
  308. * @param param
  309. * 删除的条件
  310. * @return void 返回类型
  311. * @throws BusiException
  312. */
  313. @SuppressWarnings({ "unused", "rawtypes" })
  314. public void deleteByParam(String tableName, String typeName,String id, Map param) throws BusiException {
  315. try {
  316. RestClient client = getClientFactory().getClient();
  317. //由于版本internal控制不支持将值0作为有效的版本号,因此版本等于零的文档无法使用删除, _delete_by_query并且将会使请求失败。
  318. // String url = "/" + tableName + "/" + typeName + "/_delete_by_query";
  319. String url = "/" + tableName + "/" + typeName + "/" + id;
  320. String paramStr = Json.encode(param);
  321. JsonObject match = new JsonObject();
  322. match.put("match", paramStr).toString();
  323. JsonObject query = new JsonObject();
  324. query.put("query", match);
  325. Map<String, String> params = new HashMap<String, String>();
  326. HttpEntity entity = new NStringEntity(query.toString(), ContentType.APPLICATION_JSON);
  327. Response response = client.performRequest("DELETE", url, params, entity);
  328. } catch (Exception e) {
  329. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.DELETE + ":" + e.getMessage(), e);
  330. }
  331. }
  332. /**
  333. * 删除索引
  334. *
  335. * @param indexName
  336. * @throws BusiException
  337. */
  338. @SuppressWarnings("unused")
  339. public void deleteTable(String indexName) throws BusiException {
  340. if (!isIndexExists(indexName)) {
  341. } else {
  342. RestClient client = getClientFactory().getClient();
  343. String url = "/" + indexName;
  344. try {
  345. Response response = client.performRequest("DELETE", url);
  346. } catch (IOException e) {
  347. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.DELETE + ":" + e.getMessage(), e);
  348. }
  349. }
  350. }
  351. /**
  352. * 判断索引是否存在 传入参数为索引库名称
  353. *
  354. * @param indexName
  355. * @return
  356. * @throws BusiException
  357. */
  358. public boolean isIndexExists(String indexName) throws BusiException {
  359. boolean flag = false;
  360. try {
  361. RestClient client = getClientFactory().getClient();
  362. String url = "/" + indexName;
  363. Response response = client.performRequest("GET", url);
  364. StatusLine status = response.getStatusLine();
  365. if (status.getStatusCode() == 200) {
  366. flag = true;
  367. } else if (status.getStatusCode() == 404) {
  368. flag = false;
  369. }
  370. } catch (Exception e) {
  371. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.QUERY + ":" + e.getMessage(), e);
  372. }
  373. return flag;
  374. }
  375. /**
  376. * @param tableName
  377. * @param typeName
  378. * @param id
  379. * @param pojo
  380. * @return String 返回类型
  381. * @throws BusiException
  382. * @throws BusiException
  383. * @Title: updateAppName
  384. * @Description: 日志操作记录-修改
  385. */
  386. @SuppressWarnings("unused")
  387. public <E> void update(String tableName, String typeName, String id, E pojo) throws BusiException {
  388. try {
  389. RestClient client = getClientFactory().getClient();
  390. String url = "/" + tableName + "/" + typeName + "/" + id;
  391. String bodyStr = Json.encode(pojo);
  392. HttpEntity entity = new NStringEntity(bodyStr, ContentType.APPLICATION_JSON);
  393. Map<String, String> params = new HashMap<String, String>();
  394. Response indexResponse = client.performRequest("PUT", url, params, entity);
  395. } catch (Exception e) {
  396. throw new BusiException(ErrorCodeEnum.M001, CheckPrompt.UPDATE + ":" + e.getMessage(), e);
  397. }
  398. }
  399. }