Selaa lähdekoodia

修改定时任务1小时执行一次,每次执行,判断文件是否被处理过

15810770710@163.com 3 vuotta sitten
vanhempi
commit
47b192e984

+ 47 - 14
src/main/java/com/jkcredit/traffic/record/service/TrafficRecordsAnalyseServiceImpl.java

@@ -27,7 +27,9 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -53,27 +55,18 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
     private String errorRecordsPath;
     @Value("${tempFolder.path}")
     private String tempFolderPath;
+    @Value("${pushProcess.path}")
+    private String pushProcessPath;
+
     private static final String MEDIA_TYPE = "application/json;charset=UTF-8";
     private OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
 
     @Override
     public CommonResponseObject pushTrafficRecords(String filePath) {
+
         log.info("pushTrafficRecordBegin");
         long startTime = System.currentTimeMillis();
-        List<File> files = new ArrayList<>();
-        File folder = new File(filePath);
-        File[] tempList = folder.listFiles();
-        if (tempList == null || tempList.length == 0) {
-            return new CommonResponseObject(CommonConstant.ERROR_CODE_FILE_NOT_EXIST,
-                    CommonConstant.ERROR_MESSAGE_FILE_NOT_EXIST);
-        }
-
-        // 只处理 .csv结尾文件
-        for (int i = 0; i < tempList.length; i++) {
-            if (tempList[i].isFile() && tempList[i].getName().split("\\.")[1].equals("csv")) {
-                files.add(tempList[i]);
-            }
-        }
+        List<File> files = getUnprocessedFiles(filePath);
 
         // 遍历每月的数据文件(5个)
         for (int i = 0; i < files.size(); i++) {
@@ -137,6 +130,46 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
         return new CommonResponseObject().success();
     }
 
+    private Map<String, String> isInProcess() {
+        // 文件名,时间
+        List<String> inProcessFile = CsvUtil.readCsv(pushProcessPath);
+        Map<String, String> inProcessFileMap = new HashMap<>();
+        for (String processFile : inProcessFile) {
+            String[] process = processFile.split(",");
+            inProcessFileMap.put(process[0], process[1]);
+        }
+        return inProcessFileMap;
+    }
+
+    private List<File> getUnprocessedFiles(String filePath) {
+        List<File> files = new ArrayList<>();
+        File folder = new File(filePath);
+        File[] tempList = folder.listFiles();
+        if (tempList == null || tempList.length == 0) {
+            return files;
+        }
+
+        // 判断文件夹下文件是否被处理过,如果处理过,不再反复处理
+        Map<String, String> inProcessFileMap = isInProcess();
+        ArrayList<String> processList = new ArrayList<>();
+        // 只处理 .csv结尾文件
+        for (int i = 0; i < tempList.length; i++) {
+            // 判断文件是否是以.csv结尾
+            if (tempList[i].isFile() && tempList[i].getName().split("\\.")[1].equals("csv")) {
+                // 判断文件是否被处理过
+                if (inProcessFileMap.get(tempList[i].getName()) == null) {
+                    files.add(tempList[i]);
+                    String processFile = tempList[i].getName() + "," + System.currentTimeMillis();
+                    processList.add(processFile);
+                } else {
+                    log.info("The file has been processed. Skip this file:{}", tempList[i].getName());
+                }
+            }
+        }
+        CsvUtil.writeCsv(processList, pushProcessPath);
+        return files;
+    }
+
     private List<MonthResult> getErrorMonthResult(String i, String filePath) {
         Dataset<MonthResult> monthResultDataset = sparkSession
                 .read()

+ 1 - 1
src/main/java/com/jkcredit/traffic/record/task/PushRecordsTask.java

@@ -19,7 +19,7 @@ public class PushRecordsTask {
     @Autowired
     TrafficRecordsAnalyseService trafficRecordsAnalyseService;
 
-    @Scheduled(cron = "0 0 18 * * ?")
+    @Scheduled(cron = "0 0 0/1 * * ?")
     public void push() {
         trafficRecordsAnalyseService.pushTrafficRecords(originalRecordsPath);
     }

+ 1 - 1
src/main/java/com/jkcredit/traffic/record/util/FileUtil.java

@@ -20,7 +20,7 @@ public enum FileUtil {
 
     public void write(String path) {
         CsvUtil.writeCsv(copyOnWriteArrayList, path);
-        copyOnWriteArrayList = Collections.emptyList();
+        copyOnWriteArrayList.clear();
     }
 
     public boolean deleteFile(String sPath) {

+ 3 - 1
src/main/resources/application-dev.yml

@@ -11,8 +11,10 @@ recordsEncrypt:
   ivStr: dbdca8e8316fdee2
   algorithm: SM4_CBC
 errorRecords:
-  path: /Users/jkxy/Desktop/test/output/
+  path: /Users/jkxy/Desktop/test/pushErrorRecords/
 tempFolder:
   path: /Users/jkxy/Desktop/test/temp
 originalRecords:
   path: /home/jkxy-01/data/naruto
+pushProcess:
+  path: /Users/jkxy/Desktop/test/process.csv

+ 2 - 0
src/main/resources/application-prod.yml

@@ -16,3 +16,5 @@ tempFolder:
   path: /data/tmp
 originalRecords:
   path: /home/jkxy-01/data/naruto
+pushProcess:
+  path: /home/jkxy-01/jar/process.csv

+ 1 - 1
src/main/resources/logback-spring.xml

@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration debug="false" scan="false">
     <springProperty scop="context" name="spring.application.name" source="spring.application.name" defaultValue=""/>
-    <property name="log.path" value="logs"/>
+    <property name="log.path" value="/home/jkxy-01/jar/logs"/>
     <!-- 彩色日志格式 -->
     <property name="CONSOLE_LOG_PATTERN"
               value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>