|
@ -162,14 +162,14 @@ public class QueryService { |
|
|
// System.out.println("id ==== "+id); |
|
|
// System.out.println("id ==== "+id); |
|
|
// System.out.println(subjectId); |
|
|
// System.out.println(subjectId); |
|
|
// String appId = task.getAppId(); |
|
|
// String appId = task.getAppId(); |
|
|
System.out.println("**** " + appId); |
|
|
|
|
|
|
|
|
// System.out.println("**** " + appId); |
|
|
String indexName = "cl_major_"; |
|
|
String indexName = "cl_major_"; |
|
|
if (appId.contains("ic")) { |
|
|
if (appId.contains("ic")) { |
|
|
indexName = indexName + subjectId; |
|
|
indexName = indexName + subjectId; |
|
|
} else { |
|
|
} else { |
|
|
indexName = indexName + appId.toLowerCase() + "_" + subjectId; //cl_major_61qb_12094 |
|
|
indexName = indexName + appId.toLowerCase() + "_" + subjectId; //cl_major_61qb_12094 |
|
|
} |
|
|
} |
|
|
System.out.println("indexName = " + indexName); |
|
|
|
|
|
|
|
|
// System.out.println("indexName = " + indexName); |
|
|
Integer cacheNum = task.getCacheNum(); // 拉取数据的次数 |
|
|
Integer cacheNum = task.getCacheNum(); // 拉取数据的次数 |
|
|
// 当拉数据的次数 大于1 次的时候,再拉数据的开始时间就不用是任务设置的开始时间了,同时可以再加个采集时间范围限制一下,确保拉的数据都是任务添加之后才采集的就行 |
|
|
// 当拉数据的次数 大于1 次的时候,再拉数据的开始时间就不用是任务设置的开始时间了,同时可以再加个采集时间范围限制一下,确保拉的数据都是任务添加之后才采集的就行 |
|
|
QueryBuilder queryBuilder; // 根据条件组装查询用具 |
|
|
QueryBuilder queryBuilder; // 根据条件组装查询用具 |
|
@ -182,19 +182,20 @@ public class QueryService { |
|
|
fromMills = task.getCrawlStartTime().longValue(); |
|
|
fromMills = task.getCrawlStartTime().longValue(); |
|
|
queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); |
|
|
queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); |
|
|
} |
|
|
} |
|
|
LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", |
|
|
|
|
|
taskId, |
|
|
|
|
|
indexName, |
|
|
|
|
|
new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
JSONObject.toJSONString(sourceIndices), |
|
|
|
|
|
queryBuilder.toString()); |
|
|
|
|
|
|
|
|
// LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", |
|
|
|
|
|
// taskId, |
|
|
|
|
|
// indexName, |
|
|
|
|
|
// new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
// new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
// JSONObject.toJSONString(sourceIndices), |
|
|
|
|
|
// queryBuilder.toString()); |
|
|
// 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 |
|
|
// 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 |
|
|
String finalTaskId = taskId + ""; |
|
|
String finalTaskId = taskId + ""; |
|
|
long pubTime = fromMills; |
|
|
long pubTime = fromMills; |
|
|
long finalFromMills = fromMills; |
|
|
long finalFromMills = fromMills; |
|
|
long finalToMills = toMills; |
|
|
long finalToMills = toMills; |
|
|
String finalIndexName = indexName; |
|
|
String finalIndexName = indexName; |
|
|
|
|
|
String finalIndexName1 = indexName; |
|
|
EsUtils.scrollQuery(clusterName, sourceIndices, ESConstants.INDEX_TYPE, |
|
|
EsUtils.scrollQuery(clusterName, sourceIndices, ESConstants.INDEX_TYPE, |
|
|
queryBuilder, ESConstants.SCROLL_PAGE_SIZE, ESConstants.SCROLL_MINUTES, |
|
|
queryBuilder, ESConstants.SCROLL_PAGE_SIZE, ESConstants.SCROLL_MINUTES, |
|
|
dataList -> { |
|
|
dataList -> { |
|
@ -220,8 +221,8 @@ public class QueryService { |
|
|
data = downloadAndChangePath(data); |
|
|
data = downloadAndChangePath(data); |
|
|
} |
|
|
} |
|
|
if (!data.get("_id_").equals("")) { |
|
|
if (!data.get("_id_").equals("")) { |
|
|
// saveService.saveToEsWithFilter(config.esMiniClusterName(), indexName, data); |
|
|
|
|
|
// kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); |
|
|
|
|
|
|
|
|
saveService.saveToEsWithFilter(config.esMiniClusterName(), finalIndexName1, data); |
|
|
|
|
|
kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); |
|
|
LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", finalIndexName, task.getId(), data.getString("_id_")); |
|
|
LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", finalIndexName, task.getId(), data.getString("_id_")); |
|
|
// 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! |
|
|
// 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! |
|
|
if (!siteType.equals(ESConstants.DOCTYPEITEM)) { |
|
|
if (!siteType.equals(ESConstants.DOCTYPEITEM)) { |
|
@ -340,26 +341,61 @@ public class QueryService { |
|
|
*/ |
|
|
*/ |
|
|
private JSONObject downloadAndChangePath(JSONObject data) { |
|
|
private JSONObject downloadAndChangePath(JSONObject data) { |
|
|
try { |
|
|
try { |
|
|
|
|
|
String docId = (String) data.get(ESConstants.DOC_ID); |
|
|
// 文件下载 ,之所以提取对应的src 字段是因为如果附件被下载过了,这个字段的值需要一一对应,就直接填写了,不用再做下载回填了。 |
|
|
// 文件下载 ,之所以提取对应的src 字段是因为如果附件被下载过了,这个字段的值需要一一对应,就直接填写了,不用再做下载回填了。 |
|
|
List<String> filePath = (List<String>) data.get(ESConstants.FILEPATH); |
|
|
List<String> filePath = (List<String>) data.get(ESConstants.FILEPATH); |
|
|
List<Map<String,String>> srcFileList = JsonUtils.parseArray( data.get(ESConstants.SRCFILEPATH).toString()); |
|
|
|
|
|
|
|
|
List<Map<String,String>> filePathSize = new ArrayList<>(); |
|
|
|
|
|
List<Map<String, String>> srcFileList = new ArrayList<>(); |
|
|
|
|
|
if(data.containsKey(ESConstants.FILEPATHSIZE)) { |
|
|
|
|
|
filePathSize = JsonUtils.parseArray(data.get(ESConstants.FILEPATHSIZE).toString()); |
|
|
|
|
|
} |
|
|
|
|
|
if(data.containsKey(ESConstants.SRCFILEPATH)) { |
|
|
|
|
|
srcFileList = JsonUtils.parseArray(data.get(ESConstants.SRCFILEPATH).toString()); |
|
|
|
|
|
} |
|
|
if(filePath.size() > 0){ |
|
|
if(filePath.size() > 0){ |
|
|
data = getFilePath(data,filePath,srcFileList); |
|
|
|
|
|
|
|
|
System.out.println(docId + "----- filePath : " + filePath); |
|
|
|
|
|
System.out.println("===== srcFileList :" + srcFileList); |
|
|
|
|
|
System.out.println("-=-=- filePathSize : "+filePathSize); |
|
|
|
|
|
data = getFilePath(data,filePath,srcFileList,filePathSize); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 视频下载 |
|
|
// 视频下载 |
|
|
List<String> videoPath = (List<String>) data.get(ESConstants.VIDEOPATH); |
|
|
List<String> videoPath = (List<String>) data.get(ESConstants.VIDEOPATH); |
|
|
|
|
|
List<Map<String,String>> srcVideoList = new ArrayList<>(); |
|
|
|
|
|
List<Map<String,String>> videoPathSize = new ArrayList<>(); |
|
|
|
|
|
|
|
|
if(videoPath.size() > 0){ |
|
|
if(videoPath.size() > 0){ |
|
|
List<Map<String,String>> srcVideoList = JsonUtils.parseArray( data.get(ESConstants.SRCVIDEOPATH).toString()); |
|
|
|
|
|
data = getVideoPath(data,videoPath,srcVideoList); |
|
|
|
|
|
|
|
|
if(data.containsKey(ESConstants.VIDEOPATHSIZE)){ |
|
|
|
|
|
videoPathSize = JsonUtils.parseArray(data.get(ESConstants.VIDEOPATHSIZE).toString()); |
|
|
} |
|
|
} |
|
|
|
|
|
if(data.containsKey(ESConstants.SRCVIDEOPATH)){ |
|
|
|
|
|
srcVideoList = JsonUtils.parseArray( data.get(ESConstants.SRCVIDEOPATH).toString()); |
|
|
|
|
|
} |
|
|
|
|
|
System.out.println(docId + "----- videoPath : " + videoPath); |
|
|
|
|
|
System.out.println("===== srcVideoList :" + srcVideoList); |
|
|
|
|
|
System.out.println("-=-=- videoPathSize : "+videoPathSize); |
|
|
|
|
|
data = getVideoPath(data,videoPath,srcVideoList,videoPathSize); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 图片下载 |
|
|
// 图片下载 |
|
|
List<String> imagePath = (List<String>) data.get(ESConstants.IMAGEPATH); |
|
|
List<String> imagePath = (List<String>) data.get(ESConstants.IMAGEPATH); |
|
|
List<Map<String,String>> srcImageList = JsonUtils.parseArray( data.get(ESConstants.SRCIMAGEPATH).toString()); |
|
|
|
|
|
|
|
|
List<Map<String,String>> srcImageList = new ArrayList<>(); |
|
|
|
|
|
List<Map<String,String>> imagePathSize = new ArrayList<>(); |
|
|
|
|
|
if(data.containsKey(ESConstants.IMAGEPATHSIZE)){ |
|
|
|
|
|
imagePathSize = JsonUtils.parseArray(data.get(ESConstants.IMAGEPATHSIZE).toString()); |
|
|
|
|
|
} |
|
|
|
|
|
if(data.containsKey(ESConstants.SRCIMAGEPATH)){ |
|
|
|
|
|
srcImageList = JsonUtils.parseArray( data.get(ESConstants.SRCIMAGEPATH).toString()); |
|
|
|
|
|
} |
|
|
if(imagePath.size() > 0){ |
|
|
if(imagePath.size() > 0){ |
|
|
data = getImagePath(data,imagePath,srcImageList); |
|
|
|
|
|
|
|
|
System.out.println(docId + "----- imagePath : " + imagePath); |
|
|
|
|
|
System.out.println("===== srcImageList :" + srcImageList); |
|
|
|
|
|
System.out.println("-=-=- imagePathSize : "+imagePathSize); |
|
|
|
|
|
data = getImagePath(data,imagePath,srcImageList,imagePathSize); |
|
|
} |
|
|
} |
|
|
|
|
|
// System.out.println("***** "+data); |
|
|
// isDownload 填写 |
|
|
// isDownload 填写 |
|
|
if(filePath.size() == 0 && videoPath.size() == 0 && imagePath.size() == 0){ |
|
|
|
|
|
|
|
|
if(filePathSize.size() == 0 && videoPathSize.size() == 0 && imagePathSize.size() == 0){ |
|
|
data.put(ESConstants.ISDOWNLOAD,"false"); |
|
|
data.put(ESConstants.ISDOWNLOAD,"false"); |
|
|
} |
|
|
} |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
@ -375,9 +411,9 @@ public class QueryService { |
|
|
// 当三个 pathSize 都为 0 的时候,表示三个下载结果都为空,为了保持页面和实际结果的统一,这块改成 false |
|
|
// 当三个 pathSize 都为 0 的时候,表示三个下载结果都为空,为了保持页面和实际结果的统一,这块改成 false |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private JSONObject getImagePath(JSONObject data, List<String> imagePath, List<Map<String,String>> srcImageList) { |
|
|
|
|
|
Map<String,Object> pathMap = getPathSize(imagePath,1,data,srcImageList); |
|
|
|
|
|
LOGGER.info("下载图片后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
|
|
|
|
|
|
private JSONObject getImagePath(JSONObject data, List<String> imagePath, List<Map<String,String>> srcImageList,List<Map<String,String>> pathSize) { |
|
|
|
|
|
Map<String,Object> pathMap = getPathSize(imagePath,1,data,srcImageList,pathSize); |
|
|
|
|
|
LOGGER.info("Image : 下载图片后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
if(pathMap.size() > 0) { |
|
|
if(pathMap.size() > 0) { |
|
|
imagePath = (List<String>) pathMap.get(ESConstants.PATH); |
|
|
imagePath = (List<String>) pathMap.get(ESConstants.PATH); |
|
|
data.put(ESConstants.IMAGEPATH, imagePath); |
|
|
data.put(ESConstants.IMAGEPATH, imagePath); |
|
@ -433,9 +469,9 @@ public class QueryService { |
|
|
return data; |
|
|
return data; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private JSONObject getVideoPath(JSONObject data, List<String> videoPath, List<Map<String,String>> srcVideoList ) { |
|
|
|
|
|
Map<String,Object> pathMap = getPathSize(videoPath,2,data,srcVideoList); |
|
|
|
|
|
LOGGER.info("下载视频后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
|
|
|
|
|
|
private JSONObject getVideoPath(JSONObject data, List<String> videoPath, List<Map<String,String>> srcVideoList ,List<Map<String,String>> pathSize) { |
|
|
|
|
|
Map<String,Object> pathMap = getPathSize(videoPath,2,data,srcVideoList,pathSize); |
|
|
|
|
|
LOGGER.info("Video : 下载视频后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
// 先做判断,如果 pathMap == 0 的话,对应的 videoPath、videoPathSize、srcVideoPath 都保持不变即可 |
|
|
// 先做判断,如果 pathMap == 0 的话,对应的 videoPath、videoPathSize、srcVideoPath 都保持不变即可 |
|
|
if(pathMap.size() > 0) { |
|
|
if(pathMap.size() > 0) { |
|
|
// videoPath 字段填充 |
|
|
// videoPath 字段填充 |
|
@ -491,10 +527,10 @@ public class QueryService { |
|
|
return data; |
|
|
return data; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private JSONObject getFilePath(JSONObject data, List<String> filePath, List<Map<String,String>> srcFileList) { |
|
|
|
|
|
|
|
|
private JSONObject getFilePath(JSONObject data, List<String> filePath, List<Map<String,String>> srcFileList,List<Map<String,String>> pathSize) { |
|
|
// 调用下载接口,下载并将附件上传到自己的go-fast 上 |
|
|
// 调用下载接口,下载并将附件上传到自己的go-fast 上 |
|
|
Map<String,Object> pathMap = getPathSize(filePath,0,data,srcFileList); |
|
|
|
|
|
LOGGER.info("下载文件后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
|
|
|
|
|
|
Map<String,Object> pathMap = getPathSize(filePath,0,data,srcFileList,pathSize); |
|
|
|
|
|
LOGGER.info("File : 下载文件后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); |
|
|
if(pathMap.size() > 0) { |
|
|
if(pathMap.size() > 0) { |
|
|
// 下载替换后的 path List |
|
|
// 下载替换后的 path List |
|
|
filePath = (List<String>) pathMap.get(ESConstants.PATH); |
|
|
filePath = (List<String>) pathMap.get(ESConstants.PATH); |
|
@ -530,7 +566,9 @@ public class QueryService { |
|
|
/** |
|
|
/** |
|
|
* downloadType =0 文件 ;=1 图片 ;= 2 视频 |
|
|
* downloadType =0 文件 ;=1 图片 ;= 2 视频 |
|
|
*/ |
|
|
*/ |
|
|
private Map<String,Object> getPathSize(List<String> pathList, Integer downloadType, JSONObject data,List<Map<String,String>> srcxxxList) { |
|
|
|
|
|
|
|
|
private Map<String,Object> getPathSize(List<String> pathList, Integer downloadType, JSONObject data, |
|
|
|
|
|
List<Map<String,String>> srcxxxList , |
|
|
|
|
|
List<Map<String,String>> xxxpathSize) { |
|
|
String domain = config.getGoFastDomain(); |
|
|
String domain = config.getGoFastDomain(); |
|
|
Map<String,Object> pathMap = new HashMap<>(); |
|
|
Map<String,Object> pathMap = new HashMap<>(); |
|
|
List<Map<String,String>> pathSizeList = new ArrayList<>(); |
|
|
List<Map<String,String>> pathSizeList = new ArrayList<>(); |
|
@ -546,10 +584,14 @@ public class QueryService { |
|
|
// 一下三种情况都是需要下载的情况,另外还有 path 中的链接不需要下载的情况,怎么补全pathSize 和 srcPath 字段 |
|
|
// 一下三种情况都是需要下载的情况,另外还有 path 中的链接不需要下载的情况,怎么补全pathSize 和 srcPath 字段 |
|
|
// 很可能 path 字段本身不需要下载,但是其他两个字段都需要下载后补全才行。 |
|
|
// 很可能 path 字段本身不需要下载,但是其他两个字段都需要下载后补全才行。 |
|
|
if (downloadUrl.contains("http") || downloadUrl.contains("group1") || downloadUrl.contains("group2")) { |
|
|
if (downloadUrl.contains("http") || downloadUrl.contains("group1") || downloadUrl.contains("group2")) { |
|
|
if(downloadUrl.contains("group1") || downloadUrl.contains("group2")){ |
|
|
|
|
|
|
|
|
if(!downloadUrl.contains("http")){ |
|
|
downloadUrl = domain +downloadUrl; |
|
|
downloadUrl = domain +downloadUrl; |
|
|
} |
|
|
} |
|
|
Map<String, String> pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, config.getGoFastPostUrl()); |
|
|
|
|
|
|
|
|
System.out.println("downloadUrl = "+downloadUrl); |
|
|
|
|
|
Map<String, String> pathSizeMap = new HashMap<>(); |
|
|
|
|
|
if(!downloadUrl.contains("null.py")){ |
|
|
|
|
|
pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, config.getGoFastPostUrl()); |
|
|
|
|
|
} |
|
|
// LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.", config.getGoFastPostUrl(), pathSizeMap); |
|
|
// LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.", config.getGoFastPostUrl(), pathSizeMap); |
|
|
// Map<String, String> pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, "http://172.18.1.113:8080/upload"); |
|
|
// Map<String, String> pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, "http://172.18.1.113:8080/upload"); |
|
|
if (pathSizeMap.size() > 0) { |
|
|
if (pathSizeMap.size() > 0) { |
|
@ -604,13 +646,18 @@ public class QueryService { |
|
|
}else { |
|
|
}else { |
|
|
pathMap.put(ESConstants.SRCLIST, srcList); |
|
|
pathMap.put(ESConstants.SRCLIST, srcList); |
|
|
} |
|
|
} |
|
|
|
|
|
if(xxxpathSize.size() > 0){ |
|
|
|
|
|
pathMap.put(ESConstants.PATHSIZELIST,xxxpathSize); |
|
|
|
|
|
}else{ |
|
|
|
|
|
pathMap.put(ESConstants.PATHSIZELIST,pathSizeList); |
|
|
|
|
|
} |
|
|
pathMap.put(ESConstants.PATH, path); |
|
|
pathMap.put(ESConstants.PATH, path); |
|
|
} |
|
|
} |
|
|
} catch (IOException e) { |
|
|
} catch (IOException e) { |
|
|
e.printStackTrace(); |
|
|
e.printStackTrace(); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
System.out.println("pathMap === "+JsonUtils.toJSONString(pathMap)); |
|
|
return pathMap; |
|
|
return pathMap; |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
@ -628,7 +675,6 @@ public class QueryService { |
|
|
ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime); |
|
|
ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime); |
|
|
boolQueryBuilder.must(pubTimeRange); |
|
|
boolQueryBuilder.must(pubTimeRange); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 筛选站点 因为天猫、淘宝的数据是交叉的,因此如果需要拉某一个站点的时候需将两个站点的数据都拉出来。 |
|
|
// 筛选站点 因为天猫、淘宝的数据是交叉的,因此如果需要拉某一个站点的时候需将两个站点的数据都拉出来。 |
|
|
if(cid.equals(ESConstants.TAOBAO) || cid.equals(ESConstants.TMALL)){ |
|
|
if(cid.equals(ESConstants.TAOBAO) || cid.equals(ESConstants.TMALL)){ |
|
|
boolQueryBuilder.must(QueryBuilders.termsQuery(ESConstants.EN_SOURCE, ESConstants.TAOBAO,ESConstants.TMALL)); |
|
|
boolQueryBuilder.must(QueryBuilders.termsQuery(ESConstants.EN_SOURCE, ESConstants.TAOBAO,ESConstants.TMALL)); |
|
|