Prechádzať zdrojové kódy

每次处理完成,写入错误数据

15810770710@163.com 3 rokov pred
rodič
commit
6c6ce92062

+ 33 - 2
src/main/java/com/jkcredit/traffic/record/service/TrafficRecordsAnalyseServiceImpl.java

@@ -104,6 +104,10 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
             boolean deleteResult = FileUtil.INSTANCE.deleteDirectory(tempFolder);
             log.info("deleteTempDir:{}, deleteResult:{}", tempFolder, deleteResult);
         }
+        if (FileUtil.copyOnWriteArrayList.size() != 0) {
+            log.info("trafficRecordPushErrorSize:{}", FileUtil.copyOnWriteArrayList.size());
+            FileUtil.INSTANCE.write(errorRecordsPath + System.currentTimeMillis() + ".csv");
+        }
         log.info("pushTrafficRecordEnd, costTime:{}", System.currentTimeMillis() - startTime);
         return new CommonResponseObject().success();
     }
@@ -127,13 +131,41 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
         }
 
         for (int i = 0; i < files.size(); i++) {
-            List<MonthResult> errorRecords = getMonthResult("ErrorRecord" + i, files.get(i).getAbsolutePath());
+            List<MonthResult> errorRecords = getErrorMonthResult(i + "", files.get(i).getAbsolutePath());
+            log.info("errorRecordsSize:{}", errorRecords.size());
             push(errorRecords);
+            boolean deleteResult = FileUtil.INSTANCE.deleteFile(files.get(i).getAbsolutePath());
+            log.info("deleteErrorRecordsFile:{}, deleteResult:{}", files.get(i).getAbsolutePath(), deleteResult);
+        }
+        if (FileUtil.copyOnWriteArrayList.size() != 0) {
+            log.info("trafficRecordPushErrorSize:{}", FileUtil.copyOnWriteArrayList.size());
+            FileUtil.INSTANCE.write(errorRecordsPath + System.currentTimeMillis() + ".csv");
         }
         log.info("pushErrorRecordsEnd, costTime:{}", System.currentTimeMillis() - startTime);
         return new CommonResponseObject().success();
     }
 
+    private List<MonthResult> getErrorMonthResult(String i, String filePath) {
+        Dataset<MonthResult> monthResultDataset = sparkSession
+                .read()
+                .option("header", false)
+                .csv(filePath)
+                .toDF("month", "vehicleid", "max_exTime", "sum_travel_time", "max_travel_time", "sum_feemileage",
+                        "sum_fee", "sum_weight_mileage", "exTimes_count", "axlecount", "vehicletype",
+                        "travel_provinces_count", "transtime_count")
+                .as(Encoders.bean(MonthResult.class))
+                .coalesce(36);
+        String tempViewName = "ErrorRecord" + i;
+        monthResultDataset.createOrReplaceTempView(tempViewName);
+        Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName)
+                .as(Encoders.bean(MonthResult.class));
+//        rows.write().csv("/Users/jkxy/Desktop/outPut/test" + System.currentTimeMillis());
+        List<MonthResult> results = new ArrayList<>(3000000);
+        results = rows.javaRDD().collect();
+        sparkSession.sqlContext().dropTempTable(tempViewName);
+        return results;
+    }
+
     private List<MonthResult> getMonthResult(String i, String filePath) {
         Dataset<MonthResult> monthResultDataset = sparkSession
                 .read()
@@ -294,7 +326,6 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
         if (errorData.size() != 0) {
             // 异常数据写入本地文件
             FileUtil.INSTANCE.addAll(errorData);
-            FileUtil.INSTANCE.write(errorRecordsPath + System.currentTimeMillis() + ".csv");
         }
     }
 }

+ 1 - 0
src/main/java/com/jkcredit/traffic/record/util/FileUtil.java

@@ -19,6 +19,7 @@ public enum FileUtil {
 
     public void write(String path) {
         CsvUtil.writeCsv(copyOnWriteArrayList, path);
+        copyOnWriteArrayList = new CopyOnWriteArrayList<>();
     }
 
     public boolean deleteFile(String sPath) {