فهرست منبع

提交保存一版

15810770710@163.com 3 سال پیش
والد
کامیت
0acc7d9b9d

+ 2 - 3
cg-casb.properties

@@ -9,10 +9,10 @@ VERSION=V1.1.14
 PROTOCOL=https
 
 # 服务端IP
-HOST=10.254.8.16
+HOST=121.37.153.131
 
 # 服务端端口
-PORT=443
+PORT=446
 
 # TOKEN获取地址
 UIM_TOKEN_PATH=/uim/v1/token
@@ -95,4 +95,3 @@ SDK_BASE_PATH=/home/jkxy-01/services/casb/CipherSuiteSdk_linux
 
 # 插件根目录
 AOE_BASE_PATH=
-

+ 0 - 8
pom.xml

@@ -117,18 +117,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
-<!--        <dependency>-->
-<!--            <groupId>org.springframework.boot</groupId>-->
-<!--            <artifactId>spring-boot-starter-data-redis</artifactId>-->
-<!--            <version>2.4.4</version>-->
-<!--        </dependency>-->
         <dependency>
             <groupId>com.ciphergateway</groupId>
             <artifactId>libCSCipher</artifactId>
             <version>2.2.18</version>
-            <!--            <scope>system</scope>-->
-            <!--            &lt;!&ndash; systemPath配置为cipher-suite-jni-release-2.2.3.jar文件的绝对路径 &ndash;&gt;-->
-            <!--            <systemPath>${project.basedir}/src/main/resources/lib/cipher-suite-jni-release-2.2.18.jar</systemPath>-->
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>

+ 39 - 0
src/main/java/com/jkcredit/traffic/record/action/TrafficRecordsAction.java

@@ -1,5 +1,8 @@
 package com.jkcredit.traffic.record.action;
 
+import com.ciphergateway.ciphersuite.CipherSuiteException;
+import com.ciphergateway.ciphersuite.CipherSuiteMacException;
+import com.ciphergateway.ciphersuite.CipherSuiteUtils;
 import com.jkcredit.traffic.record.model.CommonResponseObject;
 import com.jkcredit.traffic.record.service.TrafficRecordsAnalyseService;
 import lombok.extern.slf4j.Slf4j;
@@ -9,6 +12,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.xml.bind.DatatypeConverter;
+import java.io.UnsupportedEncodingException;
+
 /**
  * @author xusonglin
  * @version V1.0
@@ -29,4 +35,37 @@ public class TrafficRecordsAction {
     public CommonResponseObject pushErrorRecords(@RequestParam String path) {
         return service.pushErrorRecords(path);
     }
+
+    @PostMapping("/encrypt")
+    public String vehicleIdEncrypt(@RequestParam String vehicleId, @RequestParam String algorithm,
+                                    @RequestParam String keyId, @RequestParam String metadata, @RequestParam String ivStr) {
+        try {
+            byte[] plainData = vehicleId.getBytes("UTF-8");
+            byte[] iv = ivStr.getBytes("UTF-8");
+            Long startTime = System.currentTimeMillis();
+            // 加密
+            byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
+            return DatatypeConverter.printHexBinary(cipherData);
+        } catch (UnsupportedEncodingException ue) {
+            ue.printStackTrace();
+            log.error("UnsupportedEncodingException:", ue);
+            return "";
+        } catch (CipherSuiteException cse) {
+            cse.printStackTrace();
+            log.error("CipherSuiteException:", cse);
+            return "";
+        } catch (CipherSuiteMacException csme) {
+            csme.printStackTrace();
+            log.error("CipherSuiteMacException:", csme);
+            return "";
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("Exception:", e);
+            return "";
+        } catch (Error error) {
+            error.printStackTrace();
+            log.error("Error:", error);
+            return "";
+        }
+    }
 }

+ 4 - 0
src/main/java/com/jkcredit/traffic/record/config/SparkSessionConfig.java

@@ -20,14 +20,18 @@ public class SparkSessionConfig {
         sparkConf.set("spark.driver.allowMultipleContexts", "true");
         sparkConf.set("spark.eventLog.enabled", "true");
         sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        sparkConf.set("spark.kryoserializer.buffer.max", "1024mb");
+        sparkConf.set("spark.kryoserializer.buffer.max.mb", "1024");
         sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
         sparkConf.set("hive.mapred.supports.subdirectories", "true");
         sparkConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
 
+
         return SparkSession.builder()
                 .appName(APP_NAME)
                 .master("local")
                 .config(sparkConf)
                 .getOrCreate();
     }
+
 }

+ 19 - 17
src/main/java/com/jkcredit/traffic/record/config/ThreadPoolConfig.java

@@ -1,5 +1,6 @@
 package com.jkcredit.traffic.record.config;
 
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -14,23 +15,24 @@ import java.util.concurrent.*;
 @Configuration
 @EnableAsync
 public class ThreadPoolConfig {
-//    @Value("${thread.pool.coreSize}")
-//    private Integer coreSize;
-//
-//    @Value("${thread.pool.maximumPoolSize}")
-//    private Integer maximumPoolSize;
-//
-//    @Value("${thread.pool.queueSize}")
-//    private Integer queueSize;
-
-//    @Bean("mainThreadPoolExecutor")
-//    public ThreadPoolExecutor mainThreadPoolExecutor() {
-//        LinkedBlockingDeque<Runnable> deque = new LinkedBlockingDeque<>(queueSize);
-//
-//        return new ThreadPoolExecutor(coreSize,
-//                maximumPoolSize, 10,
-//                TimeUnit.MINUTES, deque);
-//    }
+    @Value("${thread.pool.coreSize}")
+    private Integer coreSize;
+
+    @Value("${thread.pool.maximumPoolSize}")
+    private Integer maximumPoolSize;
+
+    @Value("${thread.pool.queueSize}")
+    private Integer queueSize;
+
+    @Bean("mainThreadPoolExecutor")
+    @Qualifier("executor")
+    public ThreadPoolExecutor mainThreadPoolExecutor() {
+        LinkedBlockingDeque<Runnable> deque = new LinkedBlockingDeque<>(queueSize);
+
+        return new ThreadPoolExecutor(coreSize,
+                maximumPoolSize, 10,
+                TimeUnit.MINUTES, deque);
+    }
 
     @Bean
     public ExecutorService executorService() {

+ 224 - 12
src/main/java/com/jkcredit/traffic/record/service/TrafficRecordsAnalyseServiceImpl.java

@@ -1,20 +1,36 @@
 package com.jkcredit.traffic.record.service;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.ciphergateway.ciphersuite.CipherSuiteException;
+import com.ciphergateway.ciphersuite.CipherSuiteMacException;
+import com.ciphergateway.ciphersuite.CipherSuiteUtils;
 import com.jkcredit.traffic.record.constant.CommonConstant;
 import com.jkcredit.traffic.record.model.CommonResponseObject;
 import com.jkcredit.traffic.record.model.MonthResult;
 import com.jkcredit.traffic.record.test.AsyncPushTask;
 import com.jkcredit.traffic.record.util.CsvUtil;
+import com.jkcredit.traffic.record.util.DivideDataUtil;
 import com.jkcredit.traffic.record.util.FileUtil;
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.AsyncResult;
 import org.springframework.stereotype.Service;
 
+import javax.xml.bind.DatatypeConverter;
 import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author xusonglin
@@ -27,9 +43,26 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
     SparkSession sparkSession;
     @Autowired
     AsyncPushTask asyncPushTask;
+    @Value("${recordsPush.url}")
+    private String recordsPushUrl;
+    @Value("${recordsEncrypt.keyId}")
+    private String keyId;
+    @Value("${recordsEncrypt.metadata}")
+    private String metadata;
+    @Value("${recordsEncrypt.ivStr}")
+    private String ivStr;
+    @Value("${recordsEncrypt.algorithm}")
+    private String algorithm;
+    @Value("${errorRecords.path}")
+    private String errorRecordsPath;
+    @Value("${tempFolder.path}")
+    private String tempFolderPath;
+    private static final String MEDIA_TYPE = "application/json;charset=UTF-8";
+    private OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
 
     @Override
     public CommonResponseObject pushTrafficRecords(String filePath) {
+        log.info("开始处理");
         List<File> files = new ArrayList<>();
         File folder = new File(filePath);
         File[] tempList = folder.listFiles();
@@ -38,17 +71,41 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
                     CommonConstant.ERROR_MESSAGE_FILE_NOT_EXIST);
         }
 
+        // 只处理 .csv结尾文件
         for (int i = 0; i < tempList.length; i++) {
             if (tempList[i].isFile() && tempList[i].getName().split("\\.")[1].equals("csv")) {
                 files.add(tempList[i]);
             }
         }
 
+        // 遍历每月的数据文件(5个)
         for (int i = 0; i < files.size(); i++) {
-            String fileName = folder.getName() + "_" + i;
-            asyncPushTask.push(fileName, files.get(i).getAbsolutePath());
-        }
+            File file = new File(tempFolderPath);
+            if (!file.exists()) {
+                file.mkdir();
+            }
+            // 临时文件夹,每份文件一个文件夹
+            String tempFolder = tempFolderPath + File.separator + files.get(i).getName().split("\\.")[0];
+            try {
+                // 大文件拆分,放入临时文件夹
+                CsvUtil.splitFile(files.get(i).getAbsolutePath(), 30, tempFolder);
+                for (int j = 1; j <= 30; j++) {
+                    // 数据脱敏
+                    List<MonthResult> maskResults = desensitize(j + "", tempFolder + "/temp_" + j + ".csv");
 
+                    // 数据分割,异步推送
+                    push(maskResults);
+                    boolean deleteResult = FileUtil.INSTANCE.deleteFile(tempFolder + "/temp_" + j + ".csv");
+                    log.info("删除临时文件:{}, 删除结果:{}", tempFolder + "/temp_" + j + ".csv", deleteResult);
+                    maskResults = null;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            boolean deleteResult = FileUtil.INSTANCE.deleteDirectory(tempFolder);
+            log.info("删除临时目录:{}, 删除结果:{}", tempFolder, deleteResult);
+        }
+        log.info("结束处理");
         return new CommonResponseObject().success();
     }
 
@@ -69,19 +126,174 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
         }
 
         for (int i = 0; i < files.size(); i++) {
-            List<String> errorRecords = CsvUtil.readCsv(files.get(i).getAbsolutePath());
-            List<MonthResult> monthResults = new ArrayList<>();
-            for (String record : errorRecords) {
-                MonthResult monthResult = JSON.toJavaObject(JSON.parseObject(record), MonthResult.class);
-                monthResults.add(monthResult);
+            List<MonthResult> errorRecords = getMonthResult("ErrorRecord" +i, files.get(i).getAbsolutePath());
+            push(errorRecords);
+        }
+        return new CommonResponseObject().success();
+    }
+
+    private List<MonthResult> getMonthResult(String i, String filePath) {
+        Dataset<MonthResult> monthResultDataset = sparkSession
+                .read()
+                .option("header", false)
+                .csv(filePath)
+                .toDF("month", "vehicleid", "max_exTime", "sum_travel_time", "max_travel_time", "sum_feemileage",
+                        "sum_fee", "sum_weight_mileage","exTimes_count", "axlecount", "vehicletype",
+                        "travel_provinces_count", "transtime_count")
+                .as(Encoders.bean(MonthResult.class))
+                .coalesce(36);
+        String tempViewName = "monthDataset" + i;
+        monthResultDataset.createOrReplaceTempView(tempViewName);
+        Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName + " where sum_travel_time >= 0 "+
+                "and max_travel_time >=0" )
+                .as(Encoders.bean(MonthResult.class));
+//        rows.write().csv("/Users/jkxy/Desktop/outPut/test" + System.currentTimeMillis());
+        List<MonthResult> results = new ArrayList<>(3000000);
+        results = rows.javaRDD().collect();
+        sparkSession.sqlContext().dropTempTable(tempViewName);
+        return results;
+    }
+
+    private List<MonthResult> desensitize(String value, String filePath) {
+        List<MonthResult> results = getMonthResult(value, filePath);
+        log.info("待推送数据数量::{}", results.size());
+        // 数据加密分区间
+        List<MonthResult> maskResults = new ArrayList<>();
+        for (int i = 0; i < results.size(); i++) {
+            // 车牌号加密;
+            String encryptPlateNumber = vehicleIdEncrypt(results.get(i).getVehicleid());
+            if (StringUtils.isBlank(encryptPlateNumber)) {
+                log.error("加密失败:{}", results.get(i).toString());
+                continue;
+            }
+            results.get(i).setVehicleid(encryptPlateNumber);
+            maskResults.add(divideData(results.get(i)));
+        }
+        log.info("数据脱敏完成数量:{}", results.size());
+        return maskResults;
+    }
+
+    public void push(List<MonthResult> maskResults) {
+        int times;
+        if (maskResults.size() % 10000 != 0 ) {
+            times = (maskResults.size() / 10000) + 1;
+        } else {
+            times = maskResults.size() / 10000;
+        }
+        List<List<MonthResult>> partResultsList = new ArrayList<>();
+        for (int i = 0; i < times; i++) {
+            List<MonthResult> partResults;
+            if (i == times-1) {
+                partResults = maskResults.subList(i*10000, maskResults.size());
+            } else {
+                partResults = maskResults.subList(i*10000, (i+1)*10000);
             }
+            partResultsList.add(partResults);
+        }
+        for (List<MonthResult> partResults : partResultsList) {
             try {
-                asyncPushTask.pushRecords(monthResults);
+                pushRecords(partResults);
             } catch (Exception e) {
-                log.error("推送异常数据失败:", e);
-                return new CommonResponseObject().failed();
+                e.printStackTrace();
             }
         }
-        return new CommonResponseObject().success();
+    }
+
+    /**
+     * 对vehicle_id字段进行加密
+     * @param vehicleId 待加密vehicleId
+     * @return 加密后vehicleId
+     */
+    private String vehicleIdEncrypt(String vehicleId) {
+        try {
+            byte[] plainData = vehicleId.getBytes("UTF-8");
+            byte[] iv = ivStr.getBytes("UTF-8");
+            Long startTime = System.currentTimeMillis();
+            // 加密
+            byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
+            return DatatypeConverter.printHexBinary(cipherData);
+        } catch (UnsupportedEncodingException ue) {
+            ue.printStackTrace();
+            log.error("UnsupportedEncodingException:", ue);
+            return "";
+        } catch (CipherSuiteException cse) {
+            cse.printStackTrace();
+            log.error("CipherSuiteException:", cse);
+            return "";
+        } catch (CipherSuiteMacException csme) {
+            csme.printStackTrace();
+            log.error("CipherSuiteMacException:", csme);
+            return "";
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("Exception:", e);
+            return "";
+        } catch (Error error) {
+            error.printStackTrace();
+            log.error("Error:", error);
+            return "";
+        }
+    }
+
+    /**
+     * 对数据脱敏,分区间
+     * @param monthResult 待脱敏月结果数据
+     * @return 脱敏后月结果数据
+     */
+    private MonthResult divideData(MonthResult monthResult) {
+        monthResult.setExTimes_count(DivideDataUtil.divideExTimesCount(monthResult.getExTimes_count()));
+        monthResult.setSum_fee(DivideDataUtil.divideSumFee(monthResult.getSum_fee()));
+        monthResult.setSum_travel_time(DivideDataUtil.divideSumTravelTime(monthResult.getSum_travel_time()));
+        monthResult.setMax_travel_time(DivideDataUtil.divideMaxTravelTime(monthResult.getMax_travel_time()));
+        monthResult.setSum_feemileage(DivideDataUtil.divideSumFeeMileage(monthResult.getSum_feemileage()));
+        monthResult.setSum_weight_mileage(DivideDataUtil.divideSumWeightMileage(monthResult.getSum_weight_mileage()));
+        return monthResult;
+    }
+
+    public Future<String> pushRecords(List<MonthResult> resultList) throws InterruptedException {
+        MediaType mediaType = MediaType.parse(MEDIA_TYPE);
+        RequestBody requestBody = RequestBody.create(mediaType, JSON.toJSONString(resultList));
+
+        Request okRequest = new Request.Builder()
+                .post(requestBody)
+                .url(recordsPushUrl)
+                .build();
+        OkHttpClient client = okHttpClient.newBuilder()
+                .connectTimeout(5000, TimeUnit.MILLISECONDS)
+                .readTimeout(30000, TimeUnit.MILLISECONDS)
+                .writeTimeout(30000, TimeUnit.MILLISECONDS)
+                .build();
+        String responseContext = "";
+        List<MonthResult> errorList = new ArrayList<>();
+        try {
+            Long startTime = System.currentTimeMillis();
+            Response response = client.newCall(okRequest).execute();
+            log.info("pushRecords -- costTime:{}", System.currentTimeMillis()-startTime);
+            if (response.body() != null) {
+                responseContext = response.body().string();
+            }
+            log.info("pushRecords -- responseContext:{}", responseContext);
+            response.close();
+        } catch (SocketTimeoutException ste) {
+            ste.printStackTrace();
+            errorList = resultList;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        CommonResponseObject responseObject = JSON.toJavaObject(JSON.parseObject(responseContext), CommonResponseObject.class);
+        if (!responseObject.getCode().equals(CommonConstant.SUCCESS_CODE)) {
+            JSONArray responseErrorArray = JSON.parseArray(responseObject.getMessage());
+            errorList = responseErrorArray.toJavaList(MonthResult.class);
+        }
+        List<String> errorData = new ArrayList<>();
+        for (MonthResult monthResult : errorList) {
+            errorData.add(monthResult.toString());
+        }
+        if (errorData.size() != 0) {
+            // 异常数据写入本地文件
+            FileUtil.INSTANCE.addAll(errorData);
+            FileUtil.INSTANCE.write(errorRecordsPath+System.currentTimeMillis()+".csv");
+        }
+        return new AsyncResult<>("推送完成");
     }
 }

+ 135 - 128
src/main/java/com/jkcredit/traffic/record/test/AsyncPushTask.java

@@ -62,8 +62,9 @@ public class AsyncPushTask {
     private static final String MEDIA_TYPE = "application/json;charset=UTF-8";
     private OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
 
-    @Async
+    @Async("executor")
     public Future<String> pushRecords(List<MonthResult> resultList) throws InterruptedException {
+        log.info("&&&&&&&&&&&&&&&&&&&&&&&&&&开启线程&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
         MediaType mediaType = MediaType.parse(MEDIA_TYPE);
         RequestBody requestBody = RequestBody.create(mediaType, JSON.toJSONString(resultList));
 
@@ -72,9 +73,9 @@ public class AsyncPushTask {
                 .url(recordsPushUrl)
                 .build();
         OkHttpClient client = okHttpClient.newBuilder()
-                .connectTimeout(10000, TimeUnit.MILLISECONDS)
-                .readTimeout(10000, TimeUnit.MILLISECONDS)
-                .writeTimeout(10000, TimeUnit.MILLISECONDS)
+                .connectTimeout(5000, TimeUnit.MILLISECONDS)
+                .readTimeout(30000, TimeUnit.MILLISECONDS)
+                .writeTimeout(30000, TimeUnit.MILLISECONDS)
                 .build();
         String responseContext = "";
         List<MonthResult> errorList = new ArrayList<>();
@@ -93,136 +94,142 @@ public class AsyncPushTask {
         } catch (Exception e) {
             e.printStackTrace();
         }
-        CommonResponseObject responseObject = JSON.toJavaObject(JSON.parseObject(responseContext), CommonResponseObject.class);
-        if (!responseObject.getCode().equals(CommonConstant.SUCCESS_CODE)) {
-            JSONArray responseErrorArray = JSON.parseArray(responseObject.getMessage());
-            errorList = responseErrorArray.toJavaList(MonthResult.class);
+        if (StringUtils.isNotBlank(responseContext)) {
+            CommonResponseObject responseObject = JSON.toJavaObject(JSON.parseObject(responseContext), CommonResponseObject.class);
+            if (!responseObject.getCode().equals(CommonConstant.SUCCESS_CODE)) {
+                JSONArray responseErrorArray = JSON.parseArray(responseObject.getMessage());
+                errorList = responseErrorArray.toJavaList(MonthResult.class);
+            }
         }
+
         List<String> errorData = new ArrayList<>();
         for (MonthResult monthResult : errorList) {
             errorData.add(monthResult.toString());
         }
-        // 异常数据写入本地文件
-        FileUtil.INSTANCE.addAll(errorData);
-        FileUtil.INSTANCE.write(errorRecordsPath+System.currentTimeMillis()+".csv");
-        return new AsyncResult<>("推送完成");
-    }
-
-    @Async
-    public void push(String value, String filePath) {
-        List<MonthResult> results = getMonthResult(value, filePath);
-        // 数据加密分区间
-        List<MonthResult> maskResults = new ArrayList<>();
-        for (MonthResult monthResult : results) {
-            // 车牌号加密;
-            String encryptPlateNumber = vehicleIdEncrypt(monthResult.getVehicleid());
-            if (StringUtils.isBlank(encryptPlateNumber)) {
-                // todo
-                log.info(JSON.toJSONString(monthResult));
-                continue;
-            }
-            maskResults.add(divideData(monthResult));
-        }
-        results = null;
-
-        int times;
-        if (maskResults.size() % 10000 != 0 ) {
-            times = (maskResults.size() / 10000) + 1;
-        } else {
-            times = maskResults.size() / 10000;
-        }
-        List<List<MonthResult>> partResultsList = new ArrayList<>();
-        for (int i = 0; i < times; i++) {
-            List<MonthResult> partResults;
-            if (i == times-1) {
-                partResults = maskResults.subList(i*10000, maskResults.size());
-            } else {
-                partResults = maskResults.subList(i*10000, (i+1)*10000);
-            }
-            partResultsList.add(partResults);
-        }
-        // todo 此处需要考虑异步
-        for (List<MonthResult> partResults : partResultsList) {
-            try {
-                pushRecords(partResults);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private List<MonthResult> getMonthResult(String i, String filePath) {
-        Dataset<MonthResult> monthResultDataset = sparkSession
-                .read()
-                .option("header", true)
-                .csv(filePath)
-                .toDF("month", "vehicleid", "max_exTime", "sum_travel_time", "max_travel_time", "sum_feemileage",
-                        "sum_fee", "sum_weight_mileage","exTimes_count", "axlecount", "vehicletype",
-                        "travel_provinces_count", "transtime_count")
-                .as(Encoders.bean(MonthResult.class))
-                .coalesce(36);
-        String tempViewName = "monthDataset" + i;
-        monthResultDataset.createOrReplaceTempView(tempViewName);
-
-        Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName + " where sum_travel_time < 2678400 "+
-                "and max_travel_time < 604800 and sum_feemileage < 89280000 and sum_weight_mileage < 89280000 and exTimes_count < 1200" )
-                .as(Encoders.bean(MonthResult.class));
-//        rows.write().csv("/Users/jkxy/Desktop/outPut/test" + System.currentTimeMillis());
-        List<MonthResult> results = rows.javaRDD().collect();
-        sparkSession.sqlContext().dropTempTable(tempViewName);
-        return results;
-    }
-
-    /**
-     * 对vehicle_id字段进行加密
-     * @param vehicleId 待加密vehicleId
-     * @return 加密后vehicleId
-     */
-    private String vehicleIdEncrypt(String vehicleId) {
-        try {
-            byte[] plainData = vehicleId.getBytes("UTF-8");
-            byte[] iv = ivStr.getBytes("UTF-8");
-            Long startTime = System.currentTimeMillis();
-            // 加密
-            byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
-            log.info("encrypt result: " + DatatypeConverter.printHexBinary(cipherData));
-            log.info("encrypt success. time:" + (System.currentTimeMillis() - startTime) + "ms.");
-            return DatatypeConverter.printHexBinary(cipherData);
-        } catch (UnsupportedEncodingException ue) {
-            ue.printStackTrace();
-            log.error("UnsupportedEncodingException:", ue);
-            return "";
-        } catch (CipherSuiteException cse) {
-            cse.printStackTrace();
-            log.error("CipherSuiteException:", cse);
-            return "";
-        } catch (CipherSuiteMacException csme) {
-            csme.printStackTrace();
-            log.error("CipherSuiteMacException:", csme);
-            return "";
-        } catch (Exception e) {
-            e.printStackTrace();
-            log.error("Exception:", e);
-            return "";
-        } catch (Error error) {
-            error.printStackTrace();
-            log.error("Error:", error);
-            return "";
+        if (errorData.size() != 0) {
+            // 异常数据写入本地文件
+            FileUtil.INSTANCE.addAll(errorData);
+            FileUtil.INSTANCE.write(errorRecordsPath+System.currentTimeMillis()+".csv");
         }
+        return new AsyncResult<>("推送完成");
     }
 
-    /**
-     * 对数据脱敏,分区间
-     * @param monthResult 待脱敏月结果数据
-     * @return 脱敏后月结果数据
-     */
-    private MonthResult divideData(MonthResult monthResult) {
-        monthResult.setExTimes_count(DivideDataUtil.divideExTimesCount(monthResult.getExTimes_count()));
-        monthResult.setSum_fee(DivideDataUtil.divideSumFee(monthResult.getSum_fee()));
-        monthResult.setSum_travel_time(DivideDataUtil.divideSumTravelTime(monthResult.getSum_travel_time()));
-        monthResult.setMax_travel_time(DivideDataUtil.divideMaxTravelTime(monthResult.getMax_travel_time()));
-        monthResult.setSum_feemileage(DivideDataUtil.divideSumFeeMileage(monthResult.getSum_feemileage()));
-        monthResult.setSum_weight_mileage(DivideDataUtil.divideSumWeightMileage(monthResult.getSum_weight_mileage()));
-        return monthResult;
-    }
+//    @Async
+//    public void push(String value, String filePath) {
+//        List<MonthResult> results = getMonthResult(value, filePath);
+//        log.info("************************{}##########################", results.size());
+//        // 数据加密分区间
+//        List<MonthResult> maskResults = new ArrayList<>();
+//        for (MonthResult monthResult : results) {
+//            // 车牌号加密;
+//            String encryptPlateNumber = vehicleIdEncrypt(monthResult.getVehicleid());
+//            if (StringUtils.isBlank(encryptPlateNumber)) {
+//                // todo
+//                log.info(JSON.toJSONString(monthResult));
+//                continue;
+//            }
+//            maskResults.add(divideData(monthResult));
+//        }
+//        results = null;
+//
+//        int times;
+//        if (maskResults.size() % 10000 != 0 ) {
+//            times = (maskResults.size() / 10000) + 1;
+//        } else {
+//            times = maskResults.size() / 10000;
+//        }
+//        List<List<MonthResult>> partResultsList = new ArrayList<>();
+//        for (int i = 0; i < times; i++) {
+//            List<MonthResult> partResults;
+//            if (i == times-1) {
+//                partResults = maskResults.subList(i*10000, maskResults.size());
+//            } else {
+//                partResults = maskResults.subList(i*10000, (i+1)*10000);
+//            }
+//            partResultsList.add(partResults);
+//        }
+//        // todo 此处需要考虑异步
+//        for (List<MonthResult> partResults : partResultsList) {
+//            try {
+//                pushRecords(partResults);
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    private List<MonthResult> getMonthResult(String i, String filePath) {
+//        Dataset<MonthResult> monthResultDataset = sparkSession
+//                .read()
+//                .option("header", true)
+//                .csv(filePath)
+//                .toDF("month", "vehicleid", "max_exTime", "sum_travel_time", "max_travel_time", "sum_feemileage",
+//                        "sum_fee", "sum_weight_mileage","exTimes_count", "axlecount", "vehicletype",
+//                        "travel_provinces_count", "transtime_count")
+//                .as(Encoders.bean(MonthResult.class))
+//                .coalesce(36);
+//        String tempViewName = "monthDataset" + i;
+//        monthResultDataset.createOrReplaceTempView(tempViewName);
+//
+//        Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName + " where sum_travel_time < 2678400 "+
+//                "and max_travel_time < 604800 and sum_feemileage < 89280000 and sum_weight_mileage < 89280000 and exTimes_count < 1200" )
+//                .as(Encoders.bean(MonthResult.class));
+////        rows.write().csv("/Users/jkxy/Desktop/outPut/test" + System.currentTimeMillis());
+//        List<MonthResult> results = rows.javaRDD().collect();
+//        sparkSession.sqlContext().dropTempTable(tempViewName);
+//        return results;
+//    }
+//
+//    /**
+//     * 对vehicle_id字段进行加密
+//     * @param vehicleId 待加密vehicleId
+//     * @return 加密后vehicleId
+//     */
+//    private String vehicleIdEncrypt(String vehicleId) {
+//        try {
+//            byte[] plainData = vehicleId.getBytes("UTF-8");
+//            byte[] iv = ivStr.getBytes("UTF-8");
+//            Long startTime = System.currentTimeMillis();
+//            // 加密
+//            byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
+//            log.info("encrypt result: " + DatatypeConverter.printHexBinary(cipherData));
+//            log.info("encrypt success. time:" + (System.currentTimeMillis() - startTime) + "ms.");
+//            return DatatypeConverter.printHexBinary(cipherData);
+//        } catch (UnsupportedEncodingException ue) {
+//            ue.printStackTrace();
+//            log.error("UnsupportedEncodingException:", ue);
+//            return "";
+//        } catch (CipherSuiteException cse) {
+//            cse.printStackTrace();
+//            log.error("CipherSuiteException:", cse);
+//            return "";
+//        } catch (CipherSuiteMacException csme) {
+//            csme.printStackTrace();
+//            log.error("CipherSuiteMacException:", csme);
+//            return "";
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            log.error("Exception:", e);
+//            return "";
+//        } catch (Error error) {
+//            error.printStackTrace();
+//            log.error("Error:", error);
+//            return "";
+//        }
+//    }
+//
+//    /**
+//     * 对数据脱敏,分区间
+//     * @param monthResult 待脱敏月结果数据
+//     * @return 脱敏后月结果数据
+//     */
+//    private MonthResult divideData(MonthResult monthResult) {
+//        monthResult.setExTimes_count(DivideDataUtil.divideExTimesCount(monthResult.getExTimes_count()));
+//        monthResult.setSum_fee(DivideDataUtil.divideSumFee(monthResult.getSum_fee()));
+//        monthResult.setSum_travel_time(DivideDataUtil.divideSumTravelTime(monthResult.getSum_travel_time()));
+//        monthResult.setMax_travel_time(DivideDataUtil.divideMaxTravelTime(monthResult.getMax_travel_time()));
+//        monthResult.setSum_feemileage(DivideDataUtil.divideSumFeeMileage(monthResult.getSum_feemileage()));
+//        monthResult.setSum_weight_mileage(DivideDataUtil.divideSumWeightMileage(monthResult.getSum_weight_mileage()));
+//        return monthResult;
+//    }
 }

+ 105 - 30
src/main/java/com/jkcredit/traffic/record/test/Test.java

@@ -1,42 +1,117 @@
 package com.jkcredit.traffic.record.test;
 
-import com.ciphergateway.ciphersuite.CipherSuiteException;
-import com.ciphergateway.ciphersuite.CipherSuiteMacException;
-import com.ciphergateway.ciphersuite.CipherSuiteUtils;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
 
-import javax.xml.bind.DatatypeConverter;
-import java.io.UnsupportedEncodingException;
-
-/**
- * @author xusonglin
- * @version V1.0
- **/
 public class Test {
+
+    public static void splitFile(String filePath, int fileCount, String tempFilePath) throws IOException {
+        FileInputStream fis = new FileInputStream(filePath);
+
+        FileChannel inputChannel = fis.getChannel();
+        final long fileSize = inputChannel.size();
+        long average = fileSize / fileCount;//平均值
+        long bufferSize = 2048; //缓存块大小,自行调整
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.parseInt(bufferSize + "")); // 申请一个缓存区
+        long startPosition = 0; //子文件开始位置
+        long endPosition = average < bufferSize ? 0 : average - bufferSize;//子文件结束位置
+        for (int i = 0; i < fileCount; i++) {
+            if (i + 1 != fileCount) {
+                int read = inputChannel.read(byteBuffer, endPosition);// 读取数据
+                readW:
+                while (read != -1) {
+                    byteBuffer.flip();//切换读模式
+                    byte[] array = byteBuffer.array();
+                    for (int j = 0; j < array.length; j++) {
+                        byte b = array[j];
+                        if (b == 10 || b == 13) { //判断\n\r
+                            endPosition += j;
+                            break readW;
+                        }
+                    }
+                    endPosition += bufferSize;
+                    byteBuffer.clear(); //重置缓存块指针
+                    read = inputChannel.read(byteBuffer, endPosition);
+                }
+            }else{
+                endPosition = fileSize; //最后一个文件直接指向文件末尾
+            }
+
+            File tempFile = new File(tempFilePath);
+            if (!tempFile.exists()) {
+                tempFile.mkdir();
+            }
+            FileOutputStream fos = new FileOutputStream(tempFilePath + "\\temp_" + (i + 1) + ".csv");
+            FileChannel outputChannel = fos.getChannel();
+            inputChannel.transferTo(startPosition, endPosition - startPosition, outputChannel);//通道传输文件数据
+            outputChannel.close();
+            fos.close();
+            startPosition = endPosition + 1;
+            endPosition += average;
+        }
+        inputChannel.close();
+        fis.close();
+    }
+
+    public static void createBigFile() throws IOException {
+        File file = new File("C:\\Users\\lin\\Desktop\\test\\originalFile.csv");
+        FileWriter fileWriter = new FileWriter(file);
+        BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
+        String str = "202104,京A123C48_1,8790.0,87392.0,2638403.0,2638403.0,2638403.0,2638403.0,2638403.0,2638403.0";
+        for (int i = 0; i < 10000000; i++) {
+            bufferedWriter.write(str);
+            bufferedWriter.newLine();
+        }
+        bufferedWriter.flush();
+        bufferedWriter.close();
+    }
+
+    public static void readSplitFiles(String filePath) throws IOException{
+        File baseFile = new File(filePath);
+        File[] files = baseFile.listFiles();
+        List<String> result = new ArrayList<>(1200000);
+
+
+        for (File file : files) {
+            FileInputStream fis = new FileInputStream(file);
+            FileChannel inputChannel = fis.getChannel();
+            long bufferSize = 2048; //缓存块大小,自行调整
+            ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.parseInt(bufferSize + "")); // 申请一个缓存区
+            int read = inputChannel.read(byteBuffer);// 读取数据
+            readW:
+            while (read != -1) {
+                System.out.println(byteBuffer.toString());
+//                result.add(byteBuffer.toString());
+//            byteBuffer.flip();//切换读模式
+//            byte[] array = byteBuffer.array();
+//            for (int j = 0; j < array.length; j++) {
+//                byte b = array[j];
+//                if (b == 10 || b == 13) { //判断\n\r
+//                    break readW;
+//                }
+//            }
+//            byteBuffer.clear(); //重置缓存块指针
+//            read = inputChannel.read(byteBuffer);
+            }
+            System.out.println(result.size());
+        }
+
+    }
+
     public static void main(String[] args) {
-        String keyId = "1349736097415958529";
-        String algorithm = "SM3";
-        String plainStr = "Hello World";
-        int precision = 5;
-        int scale = 2;
-        String metadata = "ABCDBF";
-        String ivStr = "123456789abcdefg";
         try {
-            byte[] plainData = plainStr.getBytes("UTF-8");
-            byte[] iv = ivStr.getBytes("UTF-8");
-            byte[] digestData = CipherSuiteUtils.digest(plainData, algorithm, keyId, metadata);
-//            byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
-//            System.out.println("encrypt result: " + DatatypeConverter.printHexBinary(cipherData));
-        } catch (UnsupportedEncodingException ue) {
-            ue.printStackTrace();
-        } catch (CipherSuiteException cse) {
-            cse.printStackTrace();
-        } catch (CipherSuiteMacException csme) {
-            csme.printStackTrace();
+            splitFile("/Users/jkxy/Desktop/test/test/part-00001.csv", 10, "/Users/jkxy/Desktop/test/test/temp");
         } catch (Exception e) {
             e.printStackTrace();
         }
 
-
-
+//        try {
+//            readSplitFiles("C:\\Users\\lin\\Desktop\\test\\temp");
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
     }
 }

+ 88 - 0
src/main/java/com/jkcredit/traffic/record/util/CsvUtil.java

@@ -3,6 +3,8 @@ package com.jkcredit.traffic.record.util;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -60,4 +62,90 @@ public class CsvUtil {
         }
         return allString;
     }
+
+    public static List<String> readFromFile(String filename) {
+        Long startTime = System.currentTimeMillis();
+        BufferedInputStream bufferedInput = null;
+        byte[] buffer = new byte[1024];
+        List<String> lines = new ArrayList<>(25000000);
+        try {
+
+            //创建BufferedInputStream 对象
+            bufferedInput = new BufferedInputStream(new FileInputStream(filename));
+
+            int bytesRead = 0;
+
+            //从文件中按字节读取内容,到文件尾部时read方法将返回-1
+            while ((bytesRead = bufferedInput.read(buffer)) != -1) {
+
+                //将读取的字节转为字符串对象
+                String chunk = new String(buffer, 0, bytesRead);
+//                lines.add(chunk);
+                System.out.print(chunk);
+                chunk = null;
+            }
+            System.out.println("处理完成:耗时:" + (System.currentTimeMillis() - startTime));
+        } catch (FileNotFoundException ex) {
+            ex.printStackTrace();
+        } catch (IOException ex) {
+            ex.printStackTrace();
+        } finally {
+            //关闭 BufferedInputStream
+            try {
+                if (bufferedInput != null)
+                    bufferedInput.close();
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+        return lines;
+    }
+
+    public static void splitFile(String filePath, int fileCount, String tempFilePath) throws IOException {
+        FileInputStream fis = new FileInputStream(filePath);
+
+        FileChannel inputChannel = fis.getChannel();
+        final long fileSize = inputChannel.size();
+        long average = fileSize / fileCount;//平均值
+        long bufferSize = 2048; //缓存块大小,自行调整
+        ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.parseInt(bufferSize + "")); // 申请一个缓存区
+        long startPosition = 0; //子文件开始位置
+        long endPosition = average < bufferSize ? 0 : average - bufferSize;//子文件结束位置
+        for (int i = 0; i < fileCount; i++) {
+            if (i + 1 != fileCount) {
+                int read = inputChannel.read(byteBuffer, endPosition);// 读取数据
+                readW:
+                while (read != -1) {
+                    byteBuffer.flip();//切换读模式
+                    byte[] array = byteBuffer.array();
+                    for (int j = 0; j < array.length; j++) {
+                        byte b = array[j];
+                        if (b == 10 || b == 13) { //判断\n\r
+                            endPosition += j;
+                            break readW;
+                        }
+                    }
+                    endPosition += bufferSize;
+                    byteBuffer.clear(); //重置缓存块指针
+                    read = inputChannel.read(byteBuffer, endPosition);
+                }
+            }else{
+                endPosition = fileSize; //最后一个文件直接指向文件末尾
+            }
+
+            File tempFile = new File(tempFilePath);
+            if (!tempFile.exists()) {
+                tempFile.mkdir();
+            }
+            FileOutputStream fos = new FileOutputStream(tempFilePath + "/temp_" + (i + 1) + ".csv");
+            FileChannel outputChannel = fos.getChannel();
+            inputChannel.transferTo(startPosition, endPosition - startPosition, outputChannel);//通道传输文件数据
+            outputChannel.close();
+            fos.close();
+            startPosition = endPosition + 1;
+            endPosition += average;
+        }
+        inputChannel.close();
+        fis.close();
+    }
 }

+ 49 - 6
src/main/java/com/jkcredit/traffic/record/util/FileUtil.java

@@ -1,10 +1,10 @@
 package com.jkcredit.traffic.record.util;
 
-
-
-
+import java.io.File;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * @author xusonglin
@@ -13,13 +13,56 @@ import java.util.List;
 public enum FileUtil {
     INSTANCE;
 
-    public static LinkedList<String> linkedList = new LinkedList<>();
+    public static List<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
 
     public void addAll(List<String> list) {
-        linkedList.addAll(list);
+        copyOnWriteArrayList.addAll(list);
     }
 
     public void write(String path) {
-        CsvUtil.writeCsv(linkedList, path);
+        CsvUtil.writeCsv(copyOnWriteArrayList, path);
+    }
+
+    public boolean deleteFile(String sPath) {
+        boolean flag = false;
+        File file = new File(sPath);
+        // 路径为文件且不为空则进行删除
+        if (file.isFile() && file.exists()) {
+            flag = file.delete();
+        }
+        return flag;
+    }
+
+    public boolean deleteDirectory(String sPath) {
+        //如果sPath不以文件分隔符结尾,自动添加文件分隔符
+        if (!sPath.endsWith(File.separator)) {
+            sPath = sPath + File.separator;
+        }
+        File dirFile = new File(sPath);
+        //如果dir对应的文件不存在,或者不是一个目录,则退出
+        if (!dirFile.exists() || !dirFile.isDirectory()) {
+            return false;
+        }
+        boolean flag = true;
+        //删除文件夹下的所有文件(包括子目录)
+        File[] files = dirFile.listFiles();
+        for (int i = 0; i < files.length; i++) {
+            //删除子文件
+            if (files[i].isFile()) {
+                flag = deleteFile(files[i].getAbsolutePath());
+                if (!flag) break;
+            } //删除子目录
+            else {
+                flag = deleteDirectory(files[i].getAbsolutePath());
+                if (!flag) break;
+            }
+        }
+        if (!flag) return false;
+        //删除当前目录
+        if (dirFile.delete()) {
+            return true;
+        } else {
+            return false;
+        }
     }
 }

+ 16 - 0
src/main/resources/application-dev.yml

@@ -0,0 +1,16 @@
+thread:
+  pool:
+    coreSize: 3
+    maximumPoolSize: 4
+    queueSize: 1000
+recordsPush:
+  url: http://127.0.0.1:8086/recordStorage
+recordsEncrypt:
+  keyId: 1316275955873878017
+  metadata: JKCredit
+  ivStr: dbdca8e8316fdee2
+  algorithm: SM4_CBC
+errorRecords:
+  path: /Users/jkxy/Desktop/test/output/
+tempFolder:
+  path: /Users/jkxy/Desktop/test/temp

+ 16 - 0
src/main/resources/application-prod.yml

@@ -0,0 +1,16 @@
+thread:
+  pool:
+    coreSize: 3
+    maximumPoolSize: 4
+    queueSize: 1000
+recordsPush:
+  url: http://192.168.0.114:8086/recordStorage
+recordsEncrypt:
+  keyId: 1316275955873878017
+  metadata: JKCredit
+  ivStr: dbdca8e8316fdee2
+  algorithm: SM4_CBC
+errorRecords:
+  path: /data/pushErrorRecords/
+tempFolder:
+  path: /data/tmp

+ 2 - 17
src/main/resources/application.yml

@@ -1,22 +1,7 @@
 spring:
   application:
     name: traffic-record
-#  redis:
-#    host: 127.0.0.1
-#    port: 6379
-thread:
-  pool:
-    coreSize: 20
-    maximumPoolSize: 40
-    queueSize: 100
+  profiles:
+    active: dev
 server:
   port: 28081
-recordsPush:
-  url: http://127.0.0.1:8086/recordStorage
-recordsEncrypt:
-  keyId: 1316275955873878017
-  metadata: JKCredit
-  ivStr: dbdca8e8316fdee2
-  algorithm: SM4_CBC
-errorRecords:
-  path: /Users/jkxy/Desktop/test/output/