|
@@ -8,7 +8,6 @@ import com.ciphergateway.ciphersuite.CipherSuiteUtils;
|
|
|
import com.jkcredit.traffic.record.constant.CommonConstant;
|
|
|
import com.jkcredit.traffic.record.model.CommonResponseObject;
|
|
|
import com.jkcredit.traffic.record.model.MonthResult;
|
|
|
-import com.jkcredit.traffic.record.test.AsyncPushTask;
|
|
|
import com.jkcredit.traffic.record.util.CsvUtil;
|
|
|
import com.jkcredit.traffic.record.util.DivideDataUtil;
|
|
|
import com.jkcredit.traffic.record.util.FileUtil;
|
|
@@ -20,7 +19,6 @@ import org.apache.spark.sql.Encoders;
|
|
|
import org.apache.spark.sql.SparkSession;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
-import org.springframework.scheduling.annotation.AsyncResult;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.xml.bind.DatatypeConverter;
|
|
@@ -29,7 +27,6 @@ import java.io.UnsupportedEncodingException;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
@@ -41,8 +38,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseService {
|
|
|
@Autowired
|
|
|
SparkSession sparkSession;
|
|
|
- @Autowired
|
|
|
- AsyncPushTask asyncPushTask;
|
|
|
@Value("${recordsPush.url}")
|
|
|
private String recordsPushUrl;
|
|
|
@Value("${recordsEncrypt.keyId}")
|
|
@@ -62,11 +57,12 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
|
|
|
@Override
|
|
|
public CommonResponseObject pushTrafficRecords(String filePath) {
|
|
|
- log.info("开始处理");
|
|
|
+ log.info("pushTrafficRecordBegin");
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
List<File> files = new ArrayList<>();
|
|
|
File folder = new File(filePath);
|
|
|
File[] tempList = folder.listFiles();
|
|
|
- if (tempList.length == 0) {
|
|
|
+ if (tempList == null || tempList.length == 0) {
|
|
|
return new CommonResponseObject(CommonConstant.ERROR_CODE_FILE_NOT_EXIST,
|
|
|
CommonConstant.ERROR_MESSAGE_FILE_NOT_EXIST);
|
|
|
}
|
|
@@ -93,19 +89,19 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
// 数据脱敏
|
|
|
List<MonthResult> maskResults = desensitize(j + "", tempFolder + "/temp_" + j + ".csv");
|
|
|
|
|
|
- // 数据分割,异步推送
|
|
|
+ // 数据分割,推送数据
|
|
|
push(maskResults);
|
|
|
boolean deleteResult = FileUtil.INSTANCE.deleteFile(tempFolder + "/temp_" + j + ".csv");
|
|
|
- log.info("删除临时文件:{}, 删除结果:{}", tempFolder + "/temp_" + j + ".csv", deleteResult);
|
|
|
+ log.info("deleteTempFile:{}, deleteResult:{}", tempFolder + "/temp_" + j + ".csv", deleteResult);
|
|
|
maskResults = null;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
boolean deleteResult = FileUtil.INSTANCE.deleteDirectory(tempFolder);
|
|
|
- log.info("删除临时目录:{}, 删除结果:{}", tempFolder, deleteResult);
|
|
|
+ log.info("deleteTempDir:{}, deleteResult:{}", tempFolder, deleteResult);
|
|
|
}
|
|
|
- log.info("结束处理");
|
|
|
+ log.info("pushTrafficRecordEnd, costTime:{}", System.currentTimeMillis() - startTime);
|
|
|
return new CommonResponseObject().success();
|
|
|
}
|
|
|
|
|
@@ -114,19 +110,19 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
List<File> files = new ArrayList<>();
|
|
|
File folder = new File(filePath);
|
|
|
File[] tempList = folder.listFiles();
|
|
|
- if (tempList.length == 0) {
|
|
|
+ if (tempList == null || tempList.length == 0) {
|
|
|
return new CommonResponseObject(CommonConstant.ERROR_CODE_FILE_NOT_EXIST,
|
|
|
CommonConstant.ERROR_MESSAGE_FILE_NOT_EXIST);
|
|
|
}
|
|
|
|
|
|
- for (int i = 0; i < tempList.length; i++) {
|
|
|
- if (tempList[i].isFile() && tempList[i].getName().split("\\.")[1].equals("csv")) {
|
|
|
- files.add(tempList[i]);
|
|
|
+ for (File file : tempList) {
|
|
|
+ if (file.isFile() && file.getName().split("\\.")[1].equals("csv")) {
|
|
|
+ files.add(file);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (int i = 0; i < files.size(); i++) {
|
|
|
- List<MonthResult> errorRecords = getMonthResult("ErrorRecord" +i, files.get(i).getAbsolutePath());
|
|
|
+ List<MonthResult> errorRecords = getMonthResult("ErrorRecord" + i, files.get(i).getAbsolutePath());
|
|
|
push(errorRecords);
|
|
|
}
|
|
|
return new CommonResponseObject().success();
|
|
@@ -138,14 +134,14 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
.option("header", false)
|
|
|
.csv(filePath)
|
|
|
.toDF("month", "vehicleid", "max_exTime", "sum_travel_time", "max_travel_time", "sum_feemileage",
|
|
|
- "sum_fee", "sum_weight_mileage","exTimes_count", "axlecount", "vehicletype",
|
|
|
+ "sum_fee", "sum_weight_mileage", "exTimes_count", "axlecount", "vehicletype",
|
|
|
"travel_provinces_count", "transtime_count")
|
|
|
.as(Encoders.bean(MonthResult.class))
|
|
|
.coalesce(36);
|
|
|
String tempViewName = "monthDataset" + i;
|
|
|
monthResultDataset.createOrReplaceTempView(tempViewName);
|
|
|
- Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName + " where sum_travel_time >= 0 "+
|
|
|
- "and max_travel_time >=0" )
|
|
|
+ Dataset<MonthResult> rows = sparkSession.sql("select * from " + tempViewName + " where sum_travel_time >= 0 " +
|
|
|
+ "and max_travel_time >=0")
|
|
|
.as(Encoders.bean(MonthResult.class));
|
|
|
// rows.write().csv("/Users/jkxy/Desktop/outPut/test" + System.currentTimeMillis());
|
|
|
List<MonthResult> results = new ArrayList<>(3000000);
|
|
@@ -156,26 +152,27 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
|
|
|
private List<MonthResult> desensitize(String value, String filePath) {
|
|
|
List<MonthResult> results = getMonthResult(value, filePath);
|
|
|
- log.info("待推送数据数量::{}", results.size());
|
|
|
+ log.info("desensitizeCount:{}", results.size());
|
|
|
// 数据加密分区间
|
|
|
List<MonthResult> maskResults = new ArrayList<>();
|
|
|
for (int i = 0; i < results.size(); i++) {
|
|
|
// 车牌号加密;
|
|
|
String encryptPlateNumber = vehicleIdEncrypt(results.get(i).getVehicleid());
|
|
|
if (StringUtils.isBlank(encryptPlateNumber)) {
|
|
|
- log.error("加密失败:{}", results.get(i).toString());
|
|
|
+ log.error("encryptPlateNumberError:{}", results.get(i).toString());
|
|
|
continue;
|
|
|
}
|
|
|
results.get(i).setVehicleid(encryptPlateNumber);
|
|
|
maskResults.add(divideData(results.get(i)));
|
|
|
}
|
|
|
- log.info("数据脱敏完成数量:{}", results.size());
|
|
|
+ log.info("desensitizeSuccessCount:{}", results.size());
|
|
|
return maskResults;
|
|
|
}
|
|
|
|
|
|
public void push(List<MonthResult> maskResults) {
|
|
|
+ // 将数据每10000条分为一个数据包,通过http请求推送
|
|
|
int times;
|
|
|
- if (maskResults.size() % 10000 != 0 ) {
|
|
|
+ if (maskResults.size() % 10000 != 0) {
|
|
|
times = (maskResults.size() / 10000) + 1;
|
|
|
} else {
|
|
|
times = maskResults.size() / 10000;
|
|
@@ -183,24 +180,23 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
List<List<MonthResult>> partResultsList = new ArrayList<>();
|
|
|
for (int i = 0; i < times; i++) {
|
|
|
List<MonthResult> partResults;
|
|
|
- if (i == times-1) {
|
|
|
- partResults = maskResults.subList(i*10000, maskResults.size());
|
|
|
+ if (i == times - 1) {
|
|
|
+ partResults = maskResults.subList(i * 10000, maskResults.size());
|
|
|
} else {
|
|
|
- partResults = maskResults.subList(i*10000, (i+1)*10000);
|
|
|
+ partResults = maskResults.subList(i * 10000, (i + 1) * 10000);
|
|
|
}
|
|
|
partResultsList.add(partResults);
|
|
|
}
|
|
|
+
|
|
|
+ // 推送数据
|
|
|
for (List<MonthResult> partResults : partResultsList) {
|
|
|
- try {
|
|
|
- pushRecords(partResults);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
+ pushRecords(partResults);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 对vehicle_id字段进行加密
|
|
|
+ *
|
|
|
* @param vehicleId 待加密vehicleId
|
|
|
* @return 加密后vehicleId
|
|
|
*/
|
|
@@ -208,7 +204,6 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
try {
|
|
|
byte[] plainData = vehicleId.getBytes("UTF-8");
|
|
|
byte[] iv = ivStr.getBytes("UTF-8");
|
|
|
- Long startTime = System.currentTimeMillis();
|
|
|
// 加密
|
|
|
byte[] cipherData = CipherSuiteUtils.encrypt(plainData, algorithm, keyId, metadata, iv);
|
|
|
return DatatypeConverter.printHexBinary(cipherData);
|
|
@@ -237,6 +232,7 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
|
|
|
/**
|
|
|
* 对数据脱敏,分区间
|
|
|
+ *
|
|
|
* @param monthResult 待脱敏月结果数据
|
|
|
* @return 脱敏后月结果数据
|
|
|
*/
|
|
@@ -250,7 +246,7 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
return monthResult;
|
|
|
}
|
|
|
|
|
|
- public Future<String> pushRecords(List<MonthResult> resultList) throws InterruptedException {
|
|
|
+ public void pushRecords(List<MonthResult> resultList) {
|
|
|
MediaType mediaType = MediaType.parse(MEDIA_TYPE);
|
|
|
RequestBody requestBody = RequestBody.create(mediaType, JSON.toJSONString(resultList));
|
|
|
|
|
@@ -259,29 +255,29 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
.url(recordsPushUrl)
|
|
|
.build();
|
|
|
OkHttpClient client = okHttpClient.newBuilder()
|
|
|
- .connectTimeout(5000, TimeUnit.MILLISECONDS)
|
|
|
- .readTimeout(30000, TimeUnit.MILLISECONDS)
|
|
|
- .writeTimeout(30000, TimeUnit.MILLISECONDS)
|
|
|
+ .connectTimeout(1000, TimeUnit.MILLISECONDS)
|
|
|
+ .readTimeout(10000, TimeUnit.MILLISECONDS)
|
|
|
+ .writeTimeout(10000, TimeUnit.MILLISECONDS)
|
|
|
.build();
|
|
|
String responseContext = "";
|
|
|
List<MonthResult> errorList = new ArrayList<>();
|
|
|
try {
|
|
|
Long startTime = System.currentTimeMillis();
|
|
|
Response response = client.newCall(okRequest).execute();
|
|
|
- log.info("pushRecords -- costTime:{}", System.currentTimeMillis()-startTime);
|
|
|
if (response.body() != null) {
|
|
|
responseContext = response.body().string();
|
|
|
}
|
|
|
- log.info("pushRecords -- responseContext:{}", responseContext);
|
|
|
+ log.info("pushRecords -- responseContext:{}, costTime:{}", responseContext, System.currentTimeMillis() - startTime);
|
|
|
response.close();
|
|
|
} catch (SocketTimeoutException ste) {
|
|
|
- ste.printStackTrace();
|
|
|
+ log.error("pushRecordsException:", ste);
|
|
|
errorList = resultList;
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ log.error("pushRecordsException:", e);
|
|
|
+ errorList = resultList;
|
|
|
}
|
|
|
CommonResponseObject responseObject = JSON.toJavaObject(JSON.parseObject(responseContext), CommonResponseObject.class);
|
|
|
- if (!responseObject.getCode().equals(CommonConstant.SUCCESS_CODE)) {
|
|
|
+ if (responseObject != null && !responseObject.getCode().equals(CommonConstant.SUCCESS_CODE)) {
|
|
|
JSONArray responseErrorArray = JSON.parseArray(responseObject.getMessage());
|
|
|
errorList = responseErrorArray.toJavaList(MonthResult.class);
|
|
|
}
|
|
@@ -292,8 +288,7 @@ public class TrafficRecordsAnalyseServiceImpl implements TrafficRecordsAnalyseSe
|
|
|
if (errorData.size() != 0) {
|
|
|
// 异常数据写入本地文件
|
|
|
FileUtil.INSTANCE.addAll(errorData);
|
|
|
- FileUtil.INSTANCE.write(errorRecordsPath+System.currentTimeMillis()+".csv");
|
|
|
+ FileUtil.INSTANCE.write(errorRecordsPath + System.currentTimeMillis() + ".csv");
|
|
|
}
|
|
|
- return new AsyncResult<>("推送完成");
|
|
|
}
|
|
|
}
|