From 1da21ed362d26df4846337bcd101d2fa0c745b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=9D=99?= Date: Fri, 13 Aug 2021 10:37:30 +0800 Subject: [PATCH] =?UTF-8?q?release-3.1.3(20210813,=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E4=BA=86=20=E6=96=B0=E5=A2=9E=E7=AB=99=E7=82=B9=20linkedin=20?= =?UTF-8?q?=E5=92=8C=20ins=20=E6=9F=A5=E8=AF=A2=E8=AF=A6=E6=83=85=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99=20=E6=A0=87=E9=A2=98=E4=B8=8D=E5=B1=95?= =?UTF-8?q?=E7=A4=BA=20author=20=E7=9A=84=E9=97=AE=E9=A2=98=E3=80=82?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86=20=E7=A6=BB=E7=BA=BF=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=20=E7=BB=9F=E8=AE=A1=E7=9A=84=E6=97=B6=E5=80=99?= =?UTF-8?q?=E4=BC=9A=E5=B0=86=E7=B2=89=E4=B8=9D=E6=95=B0=E4=B9=9F=E4=B8=80?= =?UTF-8?q?=E8=B5=B7=E7=BB=9F=E8=AE=A1=EF=BC=8C=E4=BB=8E=E8=80=8C=E5=AF=BC?= =?UTF-8?q?=E8=87=B4task=20=E7=9A=84=E7=BB=9F=E8=AE=A1=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E8=B7=9F=E6=9F=A5=E8=AF=A2=E7=BB=93=E6=9E=9C=E4=B8=8D=E7=9B=B8?= =?UTF-8?q?=E7=AC=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mf/job/domain/repository/TaskRepository.java | 3 +- .../bfd/mf/job/service/es/EsQueryMiniService.java | 21 ++------ .../com/bfd/mf/job/service/query/QueryService.java | 60 +++++++++++----------- .../job/service/statistics/StatisticsService.java | 2 + .../java/com/bfd/mf/job/worker/QueryProducer.java | 10 ++-- .../bfd/mf/controller/SearchDataController.java | 3 ++ .../java/com/bfd/mf/service/SearchDataService.java | 39 +++++++++----- 7 files changed, 73 insertions(+), 65 deletions(-) diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java index f1dfb86..d15c91f 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java @@ -17,7 +17,8 @@ public interface TaskRepository extends CrudRepository { // 需要统计的任务的查询条件 1、 状态为 1 OR 0;2、状态为3,且任务完成时间再2天前的。 @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ((ct.crawl_status = 1 OR ct.crawl_status = 0) OR (ct.crawl_status = 3 AND ct.end_time > date_sub(curdate(),interval 2 day))); ",nativeQuery = true) - // @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ct.subject_id = 12273 ; ",nativeQuery = true) + // @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ct.subject_id = 12505 ; ",nativeQuery = true) + // @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 ; ",nativeQuery = true) List findAllBydel0(); @Query(value = "SELECT sum(data_total) FROM cl_task ct JOIN intelligent_crawl.cl_site cs ON ct.cid=cs.cid WHERE ct.del =0 AND ct.subject_id = ?1 AND cs.site_type = ?2",nativeQuery = true) diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java index a78893b..7eed643 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java @@ -139,19 +139,6 @@ public class EsQueryMiniService { if (indexName.contains(indexNamePre)) { boolean isExists = EsUtils.indexExists(clusterName, indexName); if (isExists) { -// // 任务ID 筛选 -// TermQueryBuilder cidTermQueryBuilder = QueryBuilders.termQuery(ESConstants.EN_SOURCE, cid); -// TermQueryBuilder taskIdTermQueryBuilder = QueryBuilders.termQuery(ESConstants.CRAWLDATAFLAG, crawlDataFlag); -// qb.must(taskIdTermQueryBuilder).must(cidTermQueryBuilder); -// // 时间范围筛选 -// // BoolQueryBuilder shouldbq = QueryBuilders.boolQuery(); -// RangeQueryBuilder rangeQueryBuilder = QueryBuilders -// .rangeQuery(ESConstants.PUBTIME) -// .gte(crawlStartTime) -// .lt(crawlEndTime); -// // 不用统计FB 的这种粉丝的量 -// TermQueryBuilder pageTypeQueryBuilder = QueryBuilders.termQuery(ESConstants.PAGETYPR,"socialFans"); -// qb.mustNot(pageTypeQueryBuilder).must(rangeQueryBuilder); BoolQueryBuilder qb = getQueryBuilder(cid,crawlDataFlag,crawlStartTime,crawlEndTime); logger.info("QB1 查询总量: indexName: {}. taskId : {}.{\"query\": {}}.", indexName, taskId, qb.toString().replace("\n", "").replace("\r", "").replace(" ", "")); Long count = EsUtils.queryCount(clusterName, indexName, qb); @@ -202,15 +189,17 @@ public class EsQueryMiniService { TermQueryBuilder cidTermQueryBuilder = QueryBuilders.termQuery(ESConstants.EN_SOURCE, cid); TermQueryBuilder taskIdTermQueryBuilder = QueryBuilders.termQuery(ESConstants.CRAWLDATAFLAG, crawlDataFlag); qb.must(taskIdTermQueryBuilder).must(cidTermQueryBuilder); - // 时间范围筛选 - // BoolQueryBuilder shouldbq = QueryBuilders.boolQuery(); + // 时间范围筛选 只有主贴评论需要查时间,用户不需要设置时间范围 + BoolQueryBuilder shouldbq = QueryBuilders.boolQuery(); RangeQueryBuilder rangeQueryBuilder = QueryBuilders .rangeQuery(ESConstants.PUBTIME) .gte(crawlStartTime) .lt(crawlEndTime); + TermQueryBuilder primary1 = QueryBuilders.termQuery(ESConstants.PRIMARY,2); + shouldbq.must(rangeQueryBuilder).mustNot(primary1); // 不用统计FB 的这种粉丝的量 TermQueryBuilder pageTypeQueryBuilder = QueryBuilders.termQuery(ESConstants.PAGETYPR,"socialFans"); - qb.mustNot(pageTypeQueryBuilder).must(rangeQueryBuilder); + qb.mustNot(pageTypeQueryBuilder).should(shouldbq); return qb; } } diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java index 78921f0..a3a96ab 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java @@ -63,7 +63,7 @@ public class QueryService { // 注册数据查询来源 // EsUtils.registerCluster(config.esNormalClusterName(), config.esNormalAddress());// 配置文件中的 es-source EsUtils.registerCluster(config.esMiniClusterName(), config.esMiniAddress()); // 配置文件中的 es-target -// pRateLimiter = RateLimiter.create(1.0D / config.getPeriodS()); + pRateLimiter = RateLimiter.create(1.0D / config.getPeriodS()); kafkaProducer = Kafka010Utils.getProducer(config.getBrokerList()); // cRateLimiter = RateLimiter.create(1.0D / config.getPeriodS()); @@ -486,33 +486,33 @@ public class QueryService { String resolution = ""; String videoTime = ""; try { - if(null != downloadUrl && !downloadUrl.contains("si-te.percent.cn")){ - Map pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, goFastAddr); - LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.",goFastAddr ,pathSizeMap); - if(pathSizeMap.size() > 0){ - if(downloadType == 1){ // 视频 - resolution = data.getString(ESConstants.RESOLUTION); - videoTime = data.getString(ESConstants.VIDEOTIME); - } - if(downloadType == 2) { // 图片 - resolution = DownLoadFile.imagesize(downloadUrl); + if(null != downloadUrl && !downloadUrl.contains("si-te.percent.cn")) { + if (downloadUrl.contains("http")) { + Map pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, goFastAddr); + LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.", goFastAddr, pathSizeMap); + if (pathSizeMap.size() > 0) { + if (downloadType == 1) { // 视频 + resolution = data.getString(ESConstants.RESOLUTION); + videoTime = data.getString(ESConstants.VIDEOTIME); + } + if (downloadType == 2) { // 图片 + resolution = DownLoadFile.imagesize(downloadUrl); + } + //String url = pathSizeMap.get("realUrl").replace(config.getGoFastDomain(),""); + String url = pathSizeMap.get("realUrl").replace("http://172.18.1.113:8080", ""); + String size = pathSizeMap.get("size") + "KB"; + pathSizeMap.put(ESConstants.URL, url); + pathSizeMap.put(ESConstants.SIZE, size); + pathSizeMap.put(ESConstants.RESOLUTION, resolution); + pathSizeMap.put(ESConstants.VIDEOTIME, videoTime); + pathSizeMap.remove("realUrl"); + // 这个是三个PathSize imagePathSize ,videoPathSize filePathSize + pathSizeList.add(pathSizeMap); + // 这个是 用来做 gofast 和原链接替换的,key 是原链接,value 是go-fast 链接, + srcMap.put(downloadUrl, url); + // 这个值使用来替换 三个 Path 的 imagePath,videoPath,filePath + path.add(url); } - //String url = pathSizeMap.get("realUrl").replace(config.getGoFastDomain(),""); - String url = pathSizeMap.get("realUrl").replace("http://172.18.1.113:8080",""); - String size = pathSizeMap.get("size") + "KB"; - pathSizeMap.put(ESConstants.URL,url); - pathSizeMap.put(ESConstants.SIZE,size); - pathSizeMap.put(ESConstants.RESOLUTION,resolution); - pathSizeMap.put(ESConstants.VIDEOTIME,videoTime); - pathSizeMap.remove("realUrl"); - // 这个是三个PathSize imagePathSize ,videoPathSize filePathSize - pathSizeList.add(pathSizeMap); - // 这个是 用来做 gofast 和原链接替换的,key 是原链接,value 是go-fast 链接, - srcMap.put(downloadUrl,url); - // 这个值使用来替换 三个 Path 的 imagePath,videoPath,filePath - path.add(url); - - } } } catch (IOException e) { @@ -574,7 +574,7 @@ public class QueryService { try { // TermQueryBuilder queryCidBuilders = QueryBuilders.termQuery("enSource",cid); TermQueryBuilder queryCrawlDataFlagBuilder = QueryBuilders.termQuery(ESConstants.CRAWLDATAFLAG,crawlDataFlag); - if (crawlDataFlag.contains("keyword")) { + if (crawlDataFlag.contains("keyword:")) { String keyword = crawlDataFlag.split("keyword:")[1]; System.out.println("[buildCrawlDataFlagBuilder] keyword --- " + keyword); // 关键词的话需要去 title 和 content 匹配一下 MatchPhraseQueryBuilder titleQuery = QueryBuilders.matchPhraseQuery(ESConstants.TITLE, keyword).slop(0); @@ -588,14 +588,14 @@ public class QueryService { queryBuilder = QueryBuilders.boolQuery().should(titleFilterQuery).should(contentFilterQuery).should(queryCrawlDataFlagBuilder); } } - if (crawlDataFlag.contains("url")) { + if (crawlDataFlag.contains("url:")) { String url = crawlDataFlag.split("url:")[1]; System.out.println("[buildCrawlDataFlagBuilder] url --- " + url); // url 的话直接匹配 url 字段 TermQueryBuilder queryUrlBuilders = QueryBuilders.termQuery(ESConstants.URL,url); // QueryBuilder queryBuilder1 = QueryBuilders.boolQuery().must(queryUrlBuilders); queryBuilder = QueryBuilders.boolQuery().should(queryCrawlDataFlagBuilder).should(queryUrlBuilders); } - if (crawlDataFlag.contains("account")) { + if (crawlDataFlag.contains("account:")) { String account = crawlDataFlag.split("account:")[1]; System.out.println("[buildCrawlDataFlagBuilder] account --- " + account); TermQueryBuilder queryAccountBuilders = QueryBuilders.termQuery(ESConstants.USER_URL,account); diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java index 0b1e6e4..b76d537 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java @@ -242,6 +242,7 @@ public class StatisticsService { String indexNamePre = config.getIndexNamePre(); Map countMap = new HashMap<>(); if(null != task.getCid() && !task.getCid().equals("test")) { + // 获取任务数量 countMap = esQueryMiniService.getTaskCount(miniName, taskId, task, crawlDataFlag, indexNamePre); // 直接更新 cl_task 表中的 data_total 和 today_data_total long totalCount = 0L; @@ -253,6 +254,7 @@ public class StatisticsService { if(countMap.containsKey(ESConstants.TOTALCOUNT) && countMap.containsKey(ESConstants.TODAYCOUNT)) { totalCount = countMap.get(ESConstants.TOTALCOUNT); todayCount = countMap.get(ESConstants.TODAYCOUNT); + System.out.println("******* " + totalCount); // imageCount = countMap.get(ESConstants.IMAGECOUNT); // videoCount = countMap.get(ESConstants.VIDEOCOUNT); // fileCount = countMap.get(ESConstants.FILECOUNT); diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/worker/QueryProducer.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/worker/QueryProducer.java index 14bf7da..cb6f415 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/worker/QueryProducer.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/worker/QueryProducer.java @@ -34,10 +34,10 @@ public class QueryProducer extends AbstractWorker { // LOGGER.info("[QueryProducer] work start ... "); queryBacktraceService.tryAcquire(); queryBacktraceService.produce(); -// try { -// Thread.sleep(config.getIntervalTime()); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } + try { + Thread.sleep(6000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/cl_search_api/src/main/java/com/bfd/mf/controller/SearchDataController.java b/cl_search_api/src/main/java/com/bfd/mf/controller/SearchDataController.java index dc5af7a..0cc9781 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/controller/SearchDataController.java +++ b/cl_search_api/src/main/java/com/bfd/mf/controller/SearchDataController.java @@ -47,6 +47,7 @@ public class SearchDataController { logger.info("[queryDataList] partial / Params: {}", JSONObject.toJSONString(queryRequest)); JSONObject result = new JSONObject(); try { + long start = System.currentTimeMillis(); String scorllId = queryRequest.getScrollId(); String subjectId = queryRequest.getSubjectId(); if(null != scorllId ){// 导出数据 @@ -85,6 +86,8 @@ public class SearchDataController { return ResponseWrapper.buildResponse(RTCodeEnum.C_SUBJECT_GRAMMAR_ERROR, "总数和分页不匹配"); } } + long end = System.currentTimeMillis(); + logger.info("接口查询时长:statr:"+ start +" ; end:"+end + " ; time = " + (end - start) + " ; count = "+result.get(ESConstant.ALLDOCNUMBER)); } catch (Exception e) { logger.error("[queryData] Failed,The error message is :{}", e); return ResponseWrapper.buildResponse(RTCodeEnum.C_SERVICE_NOT_AVAILABLE, "Query failed"); diff --git a/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java b/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java index 0ea50ef..628613a 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java +++ b/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java @@ -433,6 +433,7 @@ public class SearchDataService extends CrudService>imagePathSize = (List>) jsonObject.get(ESConstant.IMAGEPATHSIZE); @@ -663,6 +662,20 @@ public class SearchDataService extends CrudService> site = siteRepository.findSiteByEnSource(enSource); Map> siteMap = new HashMap<>(); @@ -741,11 +754,11 @@ public class SearchDataService extends CrudService