xusonglin 3 éve
szülő
commit
072ebd0a28

+ 28 - 40
src/main/java/com/jkcredit/record/storage/service/impl/RecordStorageServiceImpl.java

@@ -1,36 +1,18 @@
 package com.jkcredit.record.storage.service.impl;
 
 import com.alibaba.fastjson.JSON;
-import com.jkcredit.record.storage.config.EsConfig;
 import com.jkcredit.record.storage.constants.CommonConstant;
 import com.jkcredit.record.storage.model.CommonResponseObject;
 import com.jkcredit.record.storage.model.MonthResult;
 import com.jkcredit.record.storage.service.RecordStorageService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpHost;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.*;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -50,10 +32,9 @@ public class RecordStorageServiceImpl implements RecordStorageService {
 
     @Override
     public CommonResponseObject recordStorage(List<MonthResult> monthResultList) {
-        log.info("待存入ES:{}", monthResultList.size());
-        //设置保存的索引和id
-        String indexName = "index-record-" + monthResultList.get(0).getMonth();
-        BulkRequest bulkRequest=new BulkRequest();
+        log.info("recordStorageCount:{}", monthResultList.size());
+
+        BulkRequest bulkRequest = new BulkRequest();
 
         // 发送es失败id集合
         List<String> errorIds = new ArrayList<>();
@@ -61,34 +42,41 @@ public class RecordStorageServiceImpl implements RecordStorageService {
         //保存的数据 这里指定json类型
         for (MonthResult monthResult : monthResultList) {
             String id = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + UUID.randomUUID().toString();
+            //设置保存的索引和id
+            String indexName = "index-record-" + monthResult.getMonth();
             IndexRequest indexRequest = new IndexRequest(indexName).id(id);
             indexRequest.source(JSON.toJSONString(monthResult), XContentType.JSON);
             bulkRequest.add(indexRequest);
             monthResult.setId(id);
         }
-
+        BulkResponse response = null;
         try {
-//            esRestClient.bulk()
             //执行保存 index同步 indexAsync异步
-            BulkResponse response = esRestClient.bulk(bulkRequest, RequestOptions.DEFAULT);
-            log.info("推送结果:{}", !response.hasFailures());
-            // 判断发送es是否存在失败
-            if (response.hasFailures()) {
-                log.error("pushEsError-推送失败条数:{}", response.getItems().length);
-                log.error("pushEsError-推送失败条数:{}", response.getItems()[0].getFailureMessage());
-                log.error("pushEsError-推送失败条数:{}", response.getItems()[0].getFailure());
-                // 将发送es失败数据的id存入集合
-                while (response.iterator().hasNext()) {
-                    BulkItemResponse bulkItemResponse = response.iterator().next();
-                    if (bulkItemResponse.isFailed()) {
-                        errorIds.add(bulkItemResponse.getId());
-                    }
+            response = esRestClient.bulk(bulkRequest, RequestOptions.DEFAULT);
+        } catch (IOException ioe) {
+            log.error("pushEsException:", ioe);
+        }
+        // 发送es异常
+        if (response == null) {
+            CommonResponseObject responseObject = new CommonResponseObject();
+            responseObject.setCode(CommonConstant.ERROR_CODE_PUSH_ERROR);
+            responseObject.setMessage(JSON.toJSONString(monthResultList));
+            return responseObject;
+        }
 
+        log.info("pushEsResult:{}", !response.hasFailures());
+        // 判断发送es是否存在失败
+        if (response.hasFailures()) {
+            log.error("pushEsError-errorCount:{}, errorMessage:{}",
+                    response.getItems().length, response.getItems()[0].getFailureMessage());
+            // 将发送es失败数据的id存入集合
+            while (response.iterator().hasNext()) {
+                BulkItemResponse bulkItemResponse = response.iterator().next();
+                if (bulkItemResponse.isFailed()) {
+                    errorIds.add(bulkItemResponse.getId());
                 }
-            }
 
-        } catch (Exception e) {
-            log.error("pushEsException:", e);
+            }
         }
 
         // 存在发送失败数据,将发送失败数据找出并返回