diff --git a/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_2.xml b/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_7.xml similarity index 58% rename from .idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_2.xml rename to .idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_7.xml index a8b0d9c..cdd7959 100644 --- a/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_2.xml +++ b/.idea/libraries/Maven__ch_qos_logback_logback_classic_1_1_7.xml @@ -1,13 +1,13 @@ - + - + - + - + \ No newline at end of file diff --git a/.idea/libraries/Maven__ch_qos_logback_logback_core_1_1_3.xml b/.idea/libraries/Maven__ch_qos_logback_logback_core_1_1_7.xml similarity index 60% rename from .idea/libraries/Maven__ch_qos_logback_logback_core_1_1_3.xml rename to .idea/libraries/Maven__ch_qos_logback_logback_core_1_1_7.xml index fd35ca4..6c2a760 100644 --- a/.idea/libraries/Maven__ch_qos_logback_logback_core_1_1_3.xml +++ b/.idea/libraries/Maven__ch_qos_logback_logback_core_1_1_7.xml @@ -1,13 +1,13 @@ - + - + - + - + \ No newline at end of file diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/config/ESConstants.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/config/ESConstants.java index d7bb47e..d2581fb 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/config/ESConstants.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/config/ESConstants.java @@ -882,6 +882,7 @@ public class ESConstants { public static String AREA = "area"; + /** * _all字段 */ @@ -1082,6 +1083,8 @@ public class ESConstants { public static final String GOFASTURL = "gofastUrl"; public static final String ORIGINALURL = "originalUrl"; public static final String PATHSIZELIST = "pathSizeList"; + public static final String SRCLIST = "srcList"; + public static final String SRCMAP = "srcMap"; public static final String PATH = "path"; diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/entity/Task.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/entity/Task.java index fd120ba..356f624 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/entity/Task.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/entity/Task.java @@ -12,7 +12,7 @@ public class Task extends AbstractEntity { // private long top; private BigInteger subjectId; - // private String appId; + private String appId; private String externalId; // private long crawlId; private Integer siteType; @@ -54,6 +54,14 @@ public class Task extends AbstractEntity { this.subjectId = subjectId; } + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + public String getExternalId() { return externalId; } diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java index 7515626..8bd8328 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/domain/repository/TaskRepository.java @@ -12,8 +12,8 @@ import java.util.Map; public interface TaskRepository extends CrudRepository { - @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type,ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM cl_task ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.task_type <> 3 AND ct.crawl_status = 1 AND ct.cache_num = 3 AND app_id = '61qb' AND subject_id = 12094 AND ct.data_total = 0 AND ct.del = 0 AND ct.subject_id in (SELECT id from cl_subject csu WHERE csu.del =0) ORDER BY ct.id DESC;",nativeQuery = true) - // @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type,ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM cl_task ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.subject_id = 12094 AND ct.task_type <> 3 AND ct.crawl_status = 1 AND ct.cache_num = 0 AND ct.data_total = 0 AND ct.del = 0 AND ct.subject_id in (SELECT id from cl_subject csu WHERE csu.del =0) ORDER BY ct.id DESC ;",nativeQuery = true) + // @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type,ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM all_task.cl_task ct JOIN cl_site cs ON ct.cid = cs.cid WHERE ct.task_type <> 3 AND ct.crawl_status = 1 AND ct.cache_num = 3 AND ct.data_total = 0 AND ct.del = 0 AND ct.subject_id in (SELECT id from cl_subject csu WHERE csu.del =0) ORDER BY ct.id DESC;",nativeQuery = true) + @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type,ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM all_task.cl_task ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.task_type <> 3 AND ct.crawl_status = 1 AND ct.cache_num = 0 AND ct.data_total = 0 AND ct.del = 0 AND ct.subject_id in (SELECT id from all_task.cl_subject csu WHERE csu.del =0) ORDER BY ct.id DESC ;",nativeQuery = true) List findAllNewTask(); // 需要统计的任务的查询条件 1、 状态为 1 OR 0;2、状态为3,且任务完成时间再2天前的。 @@ -63,12 +63,12 @@ public interface TaskRepository extends CrudRepository { */ @Modifying @Transactional(rollbackFor = Exception.class) - @Query(value = "update cl_task set data_total =?2 , today_data_total =?3 where id =?1 ", nativeQuery = true) + @Query(value = "update cl_task set data_total =?2 , today_data_total =?3, update_time = now() where id =?1 ", nativeQuery = true) void updateTaskCount(Long id, Long totalCount, Long todayCount); @Modifying @Transactional(rollbackFor = Exception.class) - @Query(value = "update cl_task set data_total =?2 , today_data_total =?3 ,has_image_total = ?4,has_video_total = ?5, has_file_total = ?6,has_text_total = ?7 where id =?1 ", nativeQuery = true) + @Query(value = "update cl_task set data_total =?2 , today_data_total =?3 , has_image_total = ?4, has_video_total = ?5, has_file_total = ?6, has_text_total = ?7 , update_time = now() where id =?1 ", nativeQuery = true) void updateTaskCountAll(Long id, Long totalCount, Long todayCount,Long imageCount,Long videoCount,Long fileCount,Long textCount); @Modifying @@ -90,11 +90,11 @@ public interface TaskRepository extends CrudRepository { // @Query(value = "SELECT ct.id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.file_name,ct.file_remark,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ct.id = ?1",nativeQuery = true) - @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ct.id = ?1 ;",nativeQuery = true) - List findOneTaskByIdAndAppId(long taskId); - - + @Query(value = "SELECT ct.id,ct.app_id,ct.subject_id,ct.external_id,cs.site_type, ct.task_type,ct.cid,ct.crawl_status,ct.crawl_start_time,ct.crawl_end_time,ct.crawl_data_flag,ct.data_total,ct.today_data_total,ct.cache_num,ct.update_time,ct.del,ct.crawl_content_key FROM `cl_task` ct JOIN intelligent_crawl.cl_site cs ON ct.cid = cs.cid WHERE ct.del = 0 AND ct.id = ?1 AND ct.app_id = ?2 ",nativeQuery = true) + List findOneTaskByIdAndAppId(long taskId,String appID); +// @Query(value = "",nativeQuery = true) +// List getByTaskId(long taskId); // @Query(value = "SELECT id,subject_id,external_id,site_type,task_type,cid,crawl_data_flag,cache_num,crawl_start_time,crawl_end_time,data_total,today_data_total,update_time FROM cl_task WHERE NOW() > SUBDATE(update_time,interval -15 minute) AND del = 0 AND subject_id in (SELECT id from cl_subject WHERE `status` = 0 AND del =0)", nativeQuery = true) diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/download/DownLoadFile.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/download/DownLoadFile.java index 510ebcc..29fce72 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/download/DownLoadFile.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/download/DownLoadFile.java @@ -1,21 +1,24 @@ package com.bfd.mf.job.download; import com.alibaba.fastjson.JSONObject; +import com.bfd.mf.job.config.ESConstants; import okhttp3.*; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; +import java.awt.image.DataBufferByte; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.net.URLConnection; import java.util.HashMap; import java.util.Map; public class DownLoadFile { public static Map downloadAndSaveFile(String getUrl,String putUrl){ - String realUrl = "";double size; + // String path = "";double size; Map realresult= new HashMap<>(); try{ String files [] = getUrl.split("/"); @@ -23,23 +26,20 @@ public class DownLoadFile { Map header = new HashMap<>(); header.put("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36"); header.put("Connection","keep-alive"); - try { - Map downloadresult = OkHttpUtils.doGetBytes(getUrl,header); - size= (double) downloadresult.get("size"); - if (downloadresult.containsKey("content")&&size>0){ - byte[] content = (byte[]) downloadresult.get("content"); - size= (double) downloadresult.get("size"); - size = Double.valueOf(String.format("%.3f", size)); - Thread.sleep(4000); - String result = DownLoadFile.upload(putUrl,fileName,content); - Thread.sleep(4000); - realUrl = JSONObject.parseObject(result).getString("url"); - realresult.put("realUrl",realUrl); - realresult.put("size",size+""); - } - } catch (IOException e) { - e.printStackTrace(); + Map downloadresult = OkHttpUtils.doGetBytes(getUrl,header); + double size= (double) downloadresult.get(ESConstants.SIZE); + + if (downloadresult.containsKey(ESConstants.CONTENT) && size > 0){ + byte[] content = (byte[]) downloadresult.get(ESConstants.CONTENT); + //size= (double) downloadresult.get("size"); + size = Double.valueOf(String.format("%.2f", size)); + Thread.sleep(3000); + String result = DownLoadFile.upload(putUrl,fileName,content); + Thread.sleep(3000); + String path = JSONObject.parseObject(result).getString(ESConstants.PATH); + realresult.put(ESConstants.URL , path); + realresult.put(ESConstants.SIZE , size+"KB"); } }catch (Exception e){ e.printStackTrace(); @@ -47,6 +47,14 @@ public class DownLoadFile { return realresult; } + +// public static void main(String[] args) { +// String getUrl = "https://wx4.sinaimg.cn/mw690/001NtKpRly1guw9jh90poj60u01hcaqj02.jpg"; +// String putUrl = "http://172.18.1.113:8080/upload"; +// Map realresult = downloadAndSaveFile(getUrl,putUrl); +// System.out.println(JSONObject.toJSONString(realresult)); +// } + public static String upload(String uploadUrl,String fileName,byte[] content) { String result = ""; try { @@ -54,8 +62,7 @@ public class DownLoadFile { MultipartBody multipartBody = new MultipartBody.Builder(). setType(MultipartBody.FORM) .addFormDataPart("file", fileName, - RequestBody.create(MediaType.parse("multipart/form-data;charset=utf-8"), - content)) + RequestBody.create(MediaType.parse("multipart/form-data;charset=utf-8"), content)) .addFormDataPart("output", "json") .build(); Request request = new Request.Builder() @@ -75,18 +82,36 @@ public class DownLoadFile { return result; } - public static String imagesize(String getUrl) throws IOException{ - String realresult=""; + public static String getImageResolution(String getUrl) throws IOException{ + String resolution = "" ; try{ InputStream murl = new URL(getUrl).openStream(); BufferedImage sourceImg = ImageIO.read(murl); int srcWidth = sourceImg .getWidth(); // 源图宽度 int srcHeight = sourceImg .getHeight(); // 源图高度 - realresult=Integer.toString(srcWidth)+"×"+ Integer.toString(srcHeight); + resolution = Integer.toString(srcWidth)+"×"+ Integer.toString(srcHeight); }catch (Exception e){ + System.out.println("ERROR URL : " + getUrl); + // e.printStackTrace(); + } + return resolution; + } + + public static String getFileSize(String getUrl){ + String realSize = ""; + // 获取大小 + try { + URL url = new URL(getUrl); + URLConnection conn = url.openConnection(); + double size = conn.getContentLength(); + double newSize = Double.valueOf(String.format("%.2f", size/1024)); + conn.getInputStream().close(); + realSize = newSize+"KB"; + } catch (Exception e) { e.printStackTrace(); } - return realresult; + return realSize; + } public static Map upload(String uploadUrl,String fileName,File file) { diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/download/OkHttpUtils.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/download/OkHttpUtils.java index 9b6215e..8dbca66 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/download/OkHttpUtils.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/download/OkHttpUtils.java @@ -175,8 +175,10 @@ public class OkHttpUtils { if (body != null) { byte[] content=response.body().bytes(); result.put("content",content); - double size=Double.valueOf(response.header("Content-Length"))/1024; - result.put("size",size); + if(response.header("Content-Length") != null) { + double size = Double.valueOf(response.header("Content-Length")) / 1024; + result.put("size", size); + } } } return result; diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java index c1ffe91..6846b7f 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/es/EsQueryMiniService.java @@ -193,7 +193,7 @@ public class EsQueryMiniService { private BoolQueryBuilder getQueryBuilder(String cid, String crawlDataFlag, Long crawlStartTime, Long crawlEndTime) { BoolQueryBuilder qb = QueryBuilders.boolQuery(); // 任务ID 筛选 - TermQueryBuilder cidTermQueryBuilder = QueryBuilders.termQuery(ESConstants.EN_SOURCE, cid); + TermQueryBuilder cidTermQueryBuilder = QueryBuilders.termQuery(ESConstants.EN_SOURCE+".keyword", cid); TermQueryBuilder taskIdTermQueryBuilder = QueryBuilders.termQuery(ESConstants.CRAWLDATAFLAG, crawlDataFlag); qb.must(taskIdTermQueryBuilder).must(cidTermQueryBuilder); // 时间范围筛选 只有主贴评论需要查时间,用户不需要设置时间范围 diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java index e297816..9127a30 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/QueryService.java @@ -1,6 +1,7 @@ package com.bfd.mf.job.service.query; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.bfd.crawler.utils.JsonUtils; @@ -17,6 +18,7 @@ import com.bfd.mf.job.util.Kafka010Utils; import com.bfd.mf.job.util.ReadLine; import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; +import kafka.utils.Json; import org.apache.commons.lang3.exception.ExceptionUtils; import org.assertj.core.util.Lists; import org.elasticsearch.index.query.*; @@ -27,8 +29,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.math.BigDecimal; import java.math.BigInteger; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; import java.sql.Timestamp; import java.util.*; import java.util.concurrent.BlockingQueue; @@ -40,12 +48,13 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; public class QueryService { private static final Logger LOGGER = LoggerFactory.getLogger(QueryService.class); private static final Long ONE_MINUTE = 1000L * 60; - private static BlockingQueue>> P_TASK_CACHE_RANGE = new LinkedBlockingQueue<>(); + private static BlockingQueue>> P_TASK_CACHE_RANGE = new LinkedBlockingQueue<>(); private RateLimiter pRateLimiter; + // private RateLimiter dataRateLimiter; // private RateLimiter cRateLimiter; - private static String defultAddr = "http://172.18.1.113:8080/upload"; - private static String defultTopic = "test202012111"; +// private static String defultAddr = "http://172.18.1.113:8080/upload"; +// private static String defultTopic = "test202012111"; @Autowired private AppConfig config; @@ -93,11 +102,12 @@ public class QueryService { Long totalSegment = 1L;//(task.getDateEnd() - task.getDateStart()) / PERIOD_MILLS; // 3600000 Long segment = 1L; Double progressFactor = 1.0 / totalSegment; - Map> cache = Maps.newHashMap(); + Map> cache = Maps.newHashMap(); long taskId = task.getId().longValue(); + String appId = task.getAppId(); int cache_num = 1; taskRepository.updateStatus(cache_num, task.getId().longValue()); - cache.put(taskId, Lists.newArrayList(0L, 0L, progressFactor, totalSegment, segment)); + cache.put(taskId+"#@#"+appId, Lists.newArrayList(0L, 0L, progressFactor, totalSegment, segment)); try { P_TASK_CACHE_RANGE.put(cache); } catch (InterruptedException e) { @@ -110,137 +120,141 @@ public class QueryService { * 这个方法主要是把 要打标的任务先从 ES 中读出来,放到 kafka 中,然后会有另一个 线程读kafka 进行打标呢!!! */ public void produce() { - Map> range = P_TASK_CACHE_RANGE.poll();// poll -->若队列为空,返回null + Map> range = P_TASK_CACHE_RANGE.poll();// poll -->若队列为空,返回null if (Objects.isNull(range)) { return; } - long taskId = 0L; + String taskIdAppId = ""; long fromMills =0L; long toMills = 0L; - for (Map.Entry> entry : range.entrySet()) { + for (Map.Entry> entry : range.entrySet()) { entry.getValue(); - taskId = entry.getKey(); + taskIdAppId = entry.getKey(); } - Task task = taskRepository.findById(taskId).get(); - LOGGER.info("开始拉数据的任务是:" + JSONObject.toJSONString(task)); - List docIdsList = new ArrayList<>(); - try { - // 创建过滤条件 & 任务预处理 - fromMills = task.getCrawlStartTime().longValue(); - toMills = task.getCrawlEndTime().longValue(); - Long year = config.getQueryDataYearStarttime(); // 获取配置文件中用直接拉年份的时间节点,现在设置的是2019年,2019年前的全部用年做索引,不拆成天 - String clusterName = config.esNormalClusterName(); // 获取配置文件中ES的名称 - // 根据条件获取到要查询的索引的集合 - if(toMills > new Date().getTime()){ - toMills = new Date().getTime(); - } - String[] sourceIndices = EsUtils.getIndices(AppConfig.CL_INDEX, AppConfig.SEPARATOR, - fromMills, toMills, AppConfig.DATE_FORMAT, config.esNormalUpper(), config.esNormalStandby(),year); - String cid = task.getCid().toLowerCase(); // 站点的cid - String siteType = task.getSiteType().toString(); // 站点的类型 ,主要看是不是电商的,因为电商的主贴和评论在ES中的存储方式跟其他的相反 - String crawlDataFlag = task.getCrawlDataFlag(); // 任务的抓取条件 - String crawlContentKey = task.getCrawlContentKey(); // 要拉取的字段,主要看是否需要拉评论 - // BigInteger subjectId = task.getSubjectId(); - // Subject subject = subjectRepository.getSubjectBySubjectId(subjectId.longValue()); - // String indexName = "cl_major_" + task.getSubjectId(); // 索引名称 - String indexName = "cl_major_61qb_12094"; - Integer cacheNum = task.getCacheNum(); // 拉取数据的次数 - // 当拉数据的次数 大于1 次的时候,再拉数据的开始时间就不用是任务设置的开始时间了,同时可以再加个采集时间范围限制一下,确保拉的数据都是任务添加之后才采集的就行 - QueryBuilder queryBuilder; // 根据条件组装查询用具 - if(cacheNum > 1 ) { // 已经拉过历史数据的任务,将 开始时间改成当天凌晨,查询发表和抓取都是当天的数据。 - long current=System.currentTimeMillis();//当前时间毫秒数 - long zero=current/(1000*3600*24)*(1000*3600*24)-TimeZone.getDefault().getRawOffset();//今天零点零分零秒的毫秒数 - fromMills = new Timestamp(zero).getTime(); - queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag,cacheNum,siteType); - }else{ - fromMills = task.getCrawlStartTime().longValue(); - queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag,cacheNum,siteType); - } - LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", - taskId, - indexName, - new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), - new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), - JSONObject.toJSONString(sourceIndices), - queryBuilder.toString()); - // 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 - String finalTaskId = taskId+""; - long pubTime = fromMills; - long finalFromMills = fromMills; - long finalToMills = toMills; - EsUtils.scrollQuery(clusterName, sourceIndices, ESConstants.INDEX_TYPE, - queryBuilder, ESConstants.SCROLL_PAGE_SIZE, ESConstants.SCROLL_MINUTES, - dataList -> { - try { - if(dataList.size() == 0){ - System.out.println("没查到相关的 主贴 数据"); - return; - } - for (JSONObject data : dataList) { - data = getCreateTime(data,crawlDataFlag); - // 离线拉的数据加个字段吧!跟正常拉的数据做区分 - if(data.get(ESConstants.DOC_TYPE).equals(ESConstants.ITEM) && data.get(ESConstants.PRIMARY).equals(1)){ - data = getPubTime(data,pubTime); - } - saveService.initData(data,finalTaskId); - // 发送主贴 - // 是否要下载图片到指定的 go-fast上 - // 现在判断视频、图片、文件是否下载的方式只取决于isDownload 字段 - boolean isDownload = data.getBoolean(ESConstants.ISDOWNLOAD); - if(isDownload){ - String goFastAddr = defultAddr; - data = downloadAndChangePath(data,goFastAddr); - } -// if(subject.getGoFastSwitch() == 1){ -// String goFastAddr = subject.getGoFastAddr(); -// if("" == goFastAddr){ -// goFastAddr = defultAddr; -// } -// data = downloadAndChangePath(data,goFastAddr); -// } - // 是否写入到指定的kafka -// if(subject.getKafkaSwitch() == 1) { -// String kafkaTopic = subject.getKafkaTopic(); -// String kafkaAddr = subject.getKafkaAddr(); -// if(kafkaTopic.isEmpty()){ -// kafkaTopic = defultTopic; -// } -// kafkaProducer.send(kafkaTopic, JSONObject.toJSONString(data)); -// } - if(!data.get("_id_").equals("")) { - saveService.saveToEsWithFilter(config.esMiniClusterName(), indexName, data); - kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); - LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", indexName, task.getId(), data.getString("_id_")); - // 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! - if(!siteType.equals(ESConstants.DOCTYPEITEM)) { - if (crawlContentKey.contains("comment") || crawlContentKey.contains("socialComment")) { - docIdsList.add(data.get(ESConstants.DOC_ID).toString()); + long taskId = Long.valueOf(taskIdAppId.split("#@#")[0]); + String appId = taskIdAppId.split("#@#")[1]; + + List taskList = taskRepository.findOneTaskByIdAndAppId(taskId,appId); + if(taskList.size() > 0) { + for (Task task:taskList) { + LOGGER.info("开始拉数据的任务是:" + JSONObject.toJSONString(task)); + List docIdsList = new ArrayList<>(); + try { + // 创建过滤条件 & 任务预处理 + fromMills = task.getCrawlStartTime().longValue(); + toMills = task.getCrawlEndTime().longValue(); + Long year = config.getQueryDataYearStarttime(); // 获取配置文件中用直接拉年份的时间节点,现在设置的是2019年,2019年前的全部用年做索引,不拆成天 + String clusterName = config.esNormalClusterName(); // 获取配置文件中ES的名称 + // 根据条件获取到要查询的索引的集合 + if (toMills > new Date().getTime()) { + toMills = new Date().getTime(); + } + // 获取时间索引List + String[] sourceIndices = EsUtils.getIndices(AppConfig.CL_INDEX, AppConfig.SEPARATOR, + fromMills, toMills, AppConfig.DATE_FORMAT, config.esNormalUpper(), config.esNormalStandby(), year); + // 组装查询条件需要的参数 + String cid = task.getCid().toLowerCase(); // 站点的cid + String siteType = task.getSiteType().toString(); // 站点的类型 ,主要看是不是电商的,因为电商的主贴和评论在ES中的存储方式跟其他的相反 + String crawlDataFlag = task.getCrawlDataFlag(); // 任务的抓取条件 + String crawlContentKey = task.getCrawlContentKey(); // 要拉取的字段,主要看是否需要拉评论 + BigInteger subjectId = task.getSubjectId(); +// Long id = task.getId(); +// System.out.println("id ==== "+id); +// System.out.println(subjectId); + // String appId = task.getAppId(); + System.out.println("**** " + appId); + String indexName = "cl_major_"; + if (appId.contains("ic")) { + indexName = indexName + subjectId; + } else { + indexName = indexName + appId.toLowerCase() + "_" + subjectId; //cl_major_61qb_12094 + } + System.out.println("indexName = " + indexName); + Integer cacheNum = task.getCacheNum(); // 拉取数据的次数 + // 当拉数据的次数 大于1 次的时候,再拉数据的开始时间就不用是任务设置的开始时间了,同时可以再加个采集时间范围限制一下,确保拉的数据都是任务添加之后才采集的就行 + QueryBuilder queryBuilder; // 根据条件组装查询用具 + if (cacheNum > 1) { // 已经拉过历史数据的任务,将 开始时间改成当天凌晨,查询发表和抓取都是当天的数据。 + long current = System.currentTimeMillis();//当前时间毫秒数 + long zero = current / (1000 * 3600 * 24) * (1000 * 3600 * 24) - TimeZone.getDefault().getRawOffset();//今天零点零分零秒的毫秒数 + fromMills = new Timestamp(zero).getTime(); + queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); + } else { + fromMills = task.getCrawlStartTime().longValue(); + queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); + } + LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", + taskId, + indexName, + new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), + new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), + JSONObject.toJSONString(sourceIndices), + queryBuilder.toString()); + // 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 + String finalTaskId = taskId + ""; + long pubTime = fromMills; + long finalFromMills = fromMills; + long finalToMills = toMills; + String finalIndexName = indexName; + EsUtils.scrollQuery(clusterName, sourceIndices, ESConstants.INDEX_TYPE, + queryBuilder, ESConstants.SCROLL_PAGE_SIZE, ESConstants.SCROLL_MINUTES, + dataList -> { + try { + if (dataList.size() == 0) { + System.out.println("没查到相关的 主贴 数据"); + return; + } + for (JSONObject data : dataList) { + data = getCreateTime(data, crawlDataFlag); + // 为了保证电商主贴的数据能够正常的再页面进行展示,需要将电商主贴的发表时间改成拉数时间范围内的时间才行 + if (data.get(ESConstants.DOC_TYPE).equals(ESConstants.ITEM) && data.get(ESConstants.PRIMARY).equals(1)) { + data = getPubTime(data, pubTime); + } + // 离线拉的数据加个字段吧!跟正常拉的数据和采集的数据做区分 + saveService.initData(data, finalTaskId); + // 发送主贴 + // 是否要下载图片到指定的 go-fast上 + // 现在判断视频、图片、文件是否下载的方式只取决于isDownload 字段 + boolean isDownload = data.getBoolean(ESConstants.ISDOWNLOAD); + if (isDownload) { + // String goFastAddr = defultAddr; + data = downloadAndChangePath(data); + } + if (!data.get("_id_").equals("")) { +// saveService.saveToEsWithFilter(config.esMiniClusterName(), indexName, data); +// kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); + LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", finalIndexName, task.getId(), data.getString("_id_")); + // 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! + if (!siteType.equals(ESConstants.DOCTYPEITEM)) { + if (crawlContentKey.contains("comment") || crawlContentKey.contains("socialComment")) { + docIdsList.add(data.get(ESConstants.DOC_ID).toString()); + } + } } } + } catch (Exception e) { + System.out.println("ERROR: " + dataList); + throw new RuntimeException(e); } - } - } catch (Exception e) { - System.out.println("******* " + dataList ); - throw new RuntimeException(e); - } - }); + }); - // 开始拉评论数据 - if(docIdsList.size() > 0) { - String docType = docIdsList.get(0).split("_")[1]; - String docIds [] = docIdsList.toArray(new String[0]); - queryComments(docIds, docType, finalFromMills, finalToMills,finalTaskId,crawlDataFlag,indexName); + // 开始拉评论数据 + if (docIdsList.size() > 0) { + String docType = docIdsList.get(0).split("_")[1]; + String docIds[] = docIdsList.toArray(new String[0]); + // queryComments(docIds, docType, finalFromMills, finalToMills,finalTaskId,crawlDataFlag,indexName); + } + LOGGER.info("This Task is OK ! taskId = " + taskId); + Integer cache_num = task.getCacheNum(); + cache_num = cache_num + 1; + taskRepository.updateStatus(cache_num, taskId); + } catch (Exception e) { + JSONObject msg = new JSONObject(); + msg.put("message", "produce error due to [" + ExceptionUtils.getStackTrace(e) + "]"); + msg.put("from", fromMills); + msg.put("to", toMills); + LOGGER.error("Produce error due to [{}].", e.getMessage(), e); + } } - LOGGER.info("This Task is OK ! taskId = " + taskId); - Integer cache_num = task.getCacheNum(); - cache_num = cache_num +1; - taskRepository.updateStatus(cache_num, task.getId().longValue()); - } catch (Exception e) { - JSONObject msg = new JSONObject(); - msg.put("message", "produce error due to [" + ExceptionUtils.getStackTrace(e) + "]"); - msg.put("from", fromMills); - msg.put("to", toMills); - LOGGER.error("Produce error due to [{}].", e.getMessage(), e); } } @@ -264,7 +278,7 @@ public class QueryService { private void queryComments(String[] docId,String docType, long startTime,long endTime, - String crawlDataFlag,String finalTaskId, + String finalTaskId,String crawlDataFlag, String indexName) { LOGGER.info("开始拉取评论数据:"); QueryBuilder queryBuilder = getQueryBuilder(docId, startTime, endTime); @@ -301,9 +315,8 @@ public class QueryService { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); try{ // 筛选时间 - boolean boo = true; QueryBuilder pubTimeRange = buildRangeQueryBuilder( - ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime, boo, boo); + ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime); boolQueryBuilder.must(pubTimeRange); // 筛选ID QueryBuilder termQueryBuilder = QueryBuilders.termsQuery(ESConstants.DOC_ID,docId); @@ -316,215 +329,290 @@ public class QueryService { /** * 下载 文件、视频、图片,并将新的路径替换写入到 pathSize中 - * videoPath == egc + videoPath == egc filePath == ugc imagePath == pgc - */ - private JSONObject downloadAndChangePath(JSONObject data, String goFastAddr) { - String isDownload = data.get(ESConstants.ISDOWNLOAD).toString(); - Map goFastMap = new HashMap<>(); - List> filePathSize = new ArrayList<>(); - List> videoPathSize = new ArrayList<>(); - List> imagePathSize = new ArrayList<>(); - // 文件下载 - List filePath = (List) data.get(ESConstants.FILEPATH); - if(filePath.size() > 0){ - // 调用下载接口,下载并将附件上传到自己的go-fast 上 - Map srcPathMap = getPathSize(filePath,goFastAddr,0,data); - filePath = (List) srcPathMap.get(ESConstants.PATH); - data.put(ESConstants.FILEPATH,filePath); - // 组装 FILEPATHSIZE 字段 - filePathSize = (List>) srcPathMap.get(ESConstants.PATHSIZELIST); - if(filePathSize.size() >0){ - data.put(ESConstants.FILEPATHSIZE,JSONObject.toJSONString(filePathSize)); - data.put(ESConstants.UGC,1); - data.put(ESConstants.ISDOWNLOAD,isDownload); - } - // 组装 SRCFILEPATH 字段 - Map srcAndGofastUrlMap = (Map) srcPathMap.get("srcMap"); - if(data.containsKey("forwardUrl") && null != data.get("forwardUrl") ) { - try { - List> forwardUrl = JsonUtils.parseArray(data.get("forwardUrl").toString()); - List> srcPath = getSrcPath(forwardUrl,srcAndGofastUrlMap); - data.put(ESConstants.SRCFILEPATH, JSON.toJSONString(srcPath, SerializerFeature.DisableCircularReferenceDetect)); - } catch (Exception e) { - e.printStackTrace(); - } + 是否需要走下载这个,除了要看 isDownload 字段之外,还需要看 filePath\imagePath\videoPath 是否为空 + 如果都为空,直接将isDownload 置为 false即可 + 如果有一个不为空,则需要判断链接是否有 http 前缀,有前缀的做下载,并且输出时将前缀替换掉。没有的就跳过。 + 然后再看 path 对应的 pathSize 和 src 是否都有值,且去了前缀,要是有值没去前缀,做处理,要是没值则添加。 + */ + private JSONObject downloadAndChangePath(JSONObject data) { + try { + // 文件下载 ,之所以提取对应的src 字段是因为如果附件被下载过了,这个字段的值需要一一对应,就直接填写了,不用再做下载回填了。 + List filePath = (List) data.get(ESConstants.FILEPATH); + List> srcFileList = JsonUtils.parseArray( data.get(ESConstants.SRCFILEPATH).toString()); + if(filePath.size() > 0){ + data = getFilePath(data,filePath,srcFileList); } - } - // 视频下载 - List videoPath = (List) data.get(ESConstants.VIDEOPATH); - if(videoPath.size() > 0){ - // List> videoPathSize = getPathSize(videoPath,goFastAddr,1,data); - System.out.println("************ 要下载的视频链接的 List : "+videoPath); - Map srcPathMap = getPathSize(videoPath,goFastAddr,0,data); - videoPath = (List) srcPathMap.get(ESConstants.PATH); - data.put(ESConstants.VIDEOPATH,videoPath); - videoPathSize = (List>) srcPathMap.get(ESConstants.PATHSIZELIST); - if(videoPathSize.size() >0){ - data.put(ESConstants.VIDEOPATHSIZE,JSONObject.toJSONString(videoPathSize)); - data.put(ESConstants.EGC,1); - data.put(ESConstants.ISDOWNLOAD,isDownload); - } - // 组装 SRCVIDEOPATH 字段 - Map srcAndGofastUrlMap = (Map) srcPathMap.get("srcMap"); - if(data.containsKey("videoUrl") && null != data.get("videoUrl") ) { - List> srcPath = new ArrayList<>(); - if(data.get("videoUrl").toString().contains("originalUrl")){ - try { - List> videoUrl = JsonUtils.parseArray( data.get("videoUrl").toString()); - srcPath = getSrcPath(videoUrl,srcAndGofastUrlMap); - } catch (Exception e) { - e.printStackTrace(); - } - }else{ - List videoUrl = new ArrayList<>(); - try { - if(data.get("videoUrl").toString().contains("[")) { - videoUrl = JsonUtils.parseArray(data.get("videoUrl").toString()); - }else{ - videoUrl.add(data.get("videoUrl").toString()); - } - }catch (Exception e){ - e.printStackTrace(); - } - srcPath = new ArrayList<>(); - Map srcurlMap = new HashMap<>(); - if(videoPath.size() > 0) { - srcurlMap.put(ESConstants.GOFASTURL, videoPath.get(0)); - } - System.out.println("===============视频原链接的List: " + videoUrl); - if(videoUrl.size() > 0) { - srcurlMap.put(ESConstants.ORIGINALURL, videoUrl.get(0)); - } - if(srcurlMap.size() > 0) { - srcPath.add(srcurlMap); - } - } - data.put(ESConstants.SRCVIDEOPATH,JSON.toJSONString(srcPath, SerializerFeature.DisableCircularReferenceDetect)); + // 视频下载 + List videoPath = (List) data.get(ESConstants.VIDEOPATH); + if(videoPath.size() > 0){ + List> srcVideoList = JsonUtils.parseArray( data.get(ESConstants.SRCVIDEOPATH).toString()); + data = getVideoPath(data,videoPath,srcVideoList); } - - } - // 图片下载 - List imagePath = (List) data.get(ESConstants.IMAGEPATH); - if(imagePath.size() > 0){ - //List> imagePathSize = getPathSize(imagePath,goFastAddr,2,data); - Map srcPathMap = getPathSize(imagePath,goFastAddr,0,data); - imagePath = (List) srcPathMap.get(ESConstants.PATH); - data.put(ESConstants.IMAGEPATH,imagePath); - imagePathSize = (List>) srcPathMap.get(ESConstants.PATHSIZELIST); - if(imagePathSize.size() >0){ - data.put(ESConstants.IMAGEPATHSIZE,JSONObject.toJSONString(imagePathSize)); - data.put(ESConstants.PGC,1); - data.put(ESConstants.ISDOWNLOAD,isDownload); + // 图片下载 + List imagePath = (List) data.get(ESConstants.IMAGEPATH); + List> srcImageList = JsonUtils.parseArray( data.get(ESConstants.SRCIMAGEPATH).toString()); + if(imagePath.size() > 0){ + data = getImagePath(data,imagePath,srcImageList); } - Map srcAndGofastUrlMap = (Map) srcPathMap.get("srcMap"); - List> srcPath = new ArrayList<>(); - if(data.containsKey("pictureList") && null != data.get("pictureList")){ - Map pictureList = JSONObject.parseObject(data.get("pictureList").toString()); - if(!pictureList.isEmpty()){ - Map srcurlMap=new HashMap<>(); - for (Map.Entry entry : pictureList.entrySet()) { - Map imgmap= (Map) entry.getValue(); - if(imgmap.containsKey("uploadImg") && imgmap.get("uploadImg") != null && imgmap.get("uploadImg") != ""){ - srcurlMap.put(ESConstants.GOFASTURL,srcAndGofastUrlMap.get(imgmap.get("uploadImg"))); - srcurlMap.put(ESConstants.ORIGINALURL,imgmap.get("img").toString()); - } - srcPath.add(srcurlMap); - } - } + // isDownload 填写 + if(filePath.size() == 0 && videoPath.size() == 0 && imagePath.size() == 0){ + data.put(ESConstants.ISDOWNLOAD,"false"); } - data.put(ESConstants.SRCIMAGEPATH,JSON.toJSONString(srcPath, SerializerFeature.DisableCircularReferenceDetect)); + } catch (Exception e) { + e.printStackTrace(); } + return data; + // 当 filePath > 0 ,但是没有 filePathSize 的时候,需要下载组装 filePathSize + // 当 filePath > 0 ,但是没有 srcFilePath 的时候,需要下载组装 srcFilePath ,这个时候可以取 forwardUrl + // 当 videoPath > 0 ,但是没有 videoPathSize 的时候,需要下载组装 videoPathSize + // 当 videoPath > 0 ,但是没有 srcVideoPath 的时候,需要下载组装 srcVideoPath ,这个时候可以取 videoUrl + // 当 imagePath > 0 ,但是没有 imagePathSize 的时候,需要下载组装 imagePathSize + // 当 imagePath > 0 ,但是没有 srcImagePath 的时候,需要下载组装 srcImagePath ,这个时候可以取 pictureList // 当三个 pathSize 都为 0 的时候,表示三个下载结果都为空,为了保持页面和实际结果的统一,这块改成 false - if(filePathSize.size() == 0 && videoPathSize.size() == 0 && imagePathSize.size() == 0){ - data.put(ESConstants.ISDOWNLOAD,"false"); + } + + private JSONObject getImagePath(JSONObject data, List imagePath, List> srcImageList) { + Map pathMap = getPathSize(imagePath,1,data,srcImageList); + LOGGER.info("下载图片后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); + if(pathMap.size() > 0) { + imagePath = (List) pathMap.get(ESConstants.PATH); + data.put(ESConstants.IMAGEPATH, imagePath); + + List> imagePathSize = (List>) pathMap.get(ESConstants.PATHSIZELIST); + if (imagePathSize.size() > 0) { + data.put(ESConstants.IMAGEPATHSIZE, JSONObject.toJSONString(imagePathSize)); + data.put(ESConstants.PGC, 1); + data.put(ESConstants.ISDOWNLOAD, true); + } +// Map srcAndGofastUrlMap = (Map) srcPathMap.get("srcMap"); + List> srcImagePath = (List>) pathMap.get(ESConstants.SRCLIST); +// if(data.containsKey("pictureList") && null != data.get("pictureList")){ +// // 如果是社交类的 pictureList 是个List 微博的是个 String,让晨睿给改一下,这边加个判断。 +// if(data.get(ESConstants.DOC_TYPE).equals(ESConstants.SOCIAL)){ +// if(data.get("pictureList").toString().contains("[")) { +// JSONArray pictureList = JSONObject.parseArray(data.get("pictureList").toString()); +// if (pictureList.size() > 0) { +// for (Object picture : pictureList) { +// // 获取 srcUrlMap 的方法 +// Map srcurlMap = getSrcUrlMap(picture.toString()); +// srcPath.add(srcurlMap); +// } +// } +// }else if (data.get("pictureList").toString().contains("|")){ +// String lists [] = data.getString("pictureList").toString().split("|"); +// for (String picture : lists) { +// Map srcurlMap = getSrcUrlMap(picture); +// srcPath.add(srcurlMap); +// } +// }else{ +// String picture = data.get("pictureList").toString(); +// Map srcurlMap = getSrcUrlMap(picture); +// srcPath.add(srcurlMap); +// } +// }else{ +// Map pictureList = JSONObject.parseObject(data.get("pictureList").toString()); +// if (!pictureList.isEmpty()) { +// Map srcurlMap = new HashMap<>(); +// for (Map.Entry entry : pictureList.entrySet()) { +// Map imgmap = (Map) entry.getValue(); +// if (imgmap.containsKey("uploadImg") && imgmap.get("uploadImg") != null && imgmap.get("uploadImg") != "") { +// srcurlMap.put(ESConstants.GOFASTURL, srcAndGofastUrlMap.get(imgmap.get("uploadImg"))); +// srcurlMap.put(ESConstants.ORIGINALURL, imgmap.get("img").toString()); +// } +// srcPath.add(srcurlMap); +// } +// } +// } +// } + data.put(ESConstants.SRCIMAGEPATH, JSON.toJSONString(srcImagePath, SerializerFeature.DisableCircularReferenceDetect)); } return data; } + private JSONObject getVideoPath(JSONObject data, List videoPath, List> srcVideoList ) { + Map pathMap = getPathSize(videoPath,2,data,srcVideoList); + LOGGER.info("下载视频后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); + // 先做判断,如果 pathMap == 0 的话,对应的 videoPath、videoPathSize、srcVideoPath 都保持不变即可 + if(pathMap.size() > 0) { + // videoPath 字段填充 + videoPath = (List) pathMap.get(ESConstants.PATH); + data.put(ESConstants.VIDEOPATH, videoPath); + // videoPathSize 字段填充 先做判断,如果 + List> videoPathSize = (List>) pathMap.get(ESConstants.PATHSIZELIST); + if (videoPathSize.size() > 0) { + data.put(ESConstants.VIDEOPATHSIZE, JSONObject.toJSONString(videoPathSize)); + data.put(ESConstants.EGC, 1); + data.put(ESConstants.ISDOWNLOAD, true); + } + // 组装 SRCVIDEOPATH 字段 + List> srcVideoPath = (List>) pathMap.get(ESConstants.SRCLIST); + data.put(ESConstants.SRCVIDEOPATH, JSON.toJSONString(srcVideoPath, SerializerFeature.DisableCircularReferenceDetect)); + } +// Map srcAndGofastUrlMap = (Map) pathMap.get(ESConstants.SRCLIST); +// if(data.containsKey("videoUrl") && null != data.get("videoUrl") ) { +// List> srcPath = new ArrayList<>(); +// if(data.get("videoUrl").toString().contains("originalUrl")){ +// try { +// List> videoUrl = JsonUtils.parseArray( data.get("videoUrl").toString()); +// srcPath = getSrcPath(videoUrl,srcAndGofastUrlMap); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }else{ +// List videoUrl = new ArrayList<>(); +// try { +// if(data.get("videoUrl").toString().contains("[")) { +// videoUrl = JsonUtils.parseArray(data.get("videoUrl").toString()); +// }else{ +// videoUrl.add(data.get("videoUrl").toString()); +// } +// }catch (Exception e){ +// e.printStackTrace(); +// } +// srcPath = new ArrayList<>(); +// Map srcurlMap = new HashMap<>(); +// if(videoPath.size() > 0) { +// srcurlMap.put(ESConstants.GOFASTURL, videoPath.get(0)); +// } +// System.out.println("===============视频原链接的List: " + videoUrl); +// if(videoUrl.size() > 0) { +// srcurlMap.put(ESConstants.ORIGINALURL, videoUrl.get(0)); +// } +// if(srcurlMap.size() > 0) { +// srcPath.add(srcurlMap); +// } +// } + // } + return data; + } - - private List> getSrcPath(List> forwardUrl, Map srcAndGofastUrlMap) { - List> srcPathList = new ArrayList<>(); - for (Map urlMap : forwardUrl) { - if(null != urlMap) { - Map srcurlMap = new HashMap<>(); - if (urlMap.containsKey(ESConstants.GOFASTURL) && null != urlMap.get(ESConstants.GOFASTURL)) { - srcurlMap.put(ESConstants.GOFASTURL, srcAndGofastUrlMap.get(urlMap.get(ESConstants.GOFASTURL))); - } else { - srcurlMap.put(ESConstants.GOFASTURL, ""); - } - srcurlMap.put(ESConstants.ORIGINALURL, urlMap.get(ESConstants.ORIGINALURL)); - srcPathList.add(srcurlMap); + private JSONObject getFilePath(JSONObject data, List filePath, List> srcFileList) { + // 调用下载接口,下载并将附件上传到自己的go-fast 上 + Map pathMap = getPathSize(filePath,0,data,srcFileList); + LOGGER.info("下载文件后的 pathMap : {}.",JsonUtils.toJSONString(pathMap)); + if(pathMap.size() > 0) { + // 下载替换后的 path List + filePath = (List) pathMap.get(ESConstants.PATH); + data.put(ESConstants.FILEPATH, filePath); + // 组装 FILEPATHSIZE 字段 + List> filePathSize = (List>) pathMap.get(ESConstants.PATHSIZELIST); + if (filePathSize.size() > 0) { + data.put(ESConstants.FILEPATHSIZE, JSONObject.toJSONString(filePathSize)); + data.put(ESConstants.UGC, 1); + data.put(ESConstants.ISDOWNLOAD, true); } + // 组装 SRCFILEPATH 字段 + // Map srcAndGofastUrlMap = (Map) pathMap.get("srcMap"); + // System.out.println("**** " + JSONObject.toJSONString(pathMap.get(ESConstants.SRCLIST))); + List> srcFilePath = (List>) pathMap.get(ESConstants.SRCLIST); + data.put(ESConstants.SRCFILEPATH, JSON.toJSONString(srcFilePath, SerializerFeature.DisableCircularReferenceDetect)); } - return srcPathList; + +// if(data.containsKey(ESConstants.FORWARD_URL) && !("").equals(data.getString(ESConstants.FORWARD_URL)) && null != data.get(ESConstants.FORWARD_URL) ) { +// try { +// List> forwardUrl = JsonUtils.parseArray(data.get(ESConstants.FORWARD_URL).toString()); +// // List> srcPath = getSrcPath(forwardUrl,srcAndGofastUrlMap); +// data.put(ESConstants.SRCFILEPATH, JSON.toJSONString(srcFilePath, SerializerFeature.DisableCircularReferenceDetect)); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } + return data; } -// public static void main(String[] args) { -// QueryService queryService = new QueryService(); -// List list = ReadLine.readLine(new File("E:\\work/test1.txt")); -// JSONObject data = JSONObject.parseObject(list.get(0)); -// String goFastAddr = "http://172.18.1.113:8080/upload"; -// JSONObject result = queryService.downloadAndChangePath(data,goFastAddr); -// System.out.println(result); -// -// } /** - * downloadType =0 文件 =1 图片 = 2 视频 + * downloadType =0 文件 ;=1 图片 ;= 2 视频 */ - private Map getPathSize(List pathList, String goFastAddr,Integer downloadType,JSONObject data) { + private Map getPathSize(List pathList, Integer downloadType, JSONObject data,List> srcxxxList) { + String domain = config.getGoFastDomain(); Map pathMap = new HashMap<>(); List> pathSizeList = new ArrayList<>(); List path = new ArrayList<>(); - Map srcMap = new HashMap<>(); + List> srcList = new ArrayList(); for (String downloadUrl:pathList) { + Map srcMap = new HashMap<>(); + srcMap.put(ESConstants.ORIGINALURL,downloadUrl); String resolution = ""; String videoTime = ""; try { if(null != downloadUrl && !downloadUrl.contains("si-te.percent.cn")) { - if (downloadUrl.contains("http")) { - Map pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, goFastAddr); - LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.", goFastAddr, pathSizeMap); + // 一下三种情况都是需要下载的情况,另外还有 path 中的链接不需要下载的情况,怎么补全pathSize 和 srcPath 字段 + // 很可能 path 字段本身不需要下载,但是其他两个字段都需要下载后补全才行。 + if (downloadUrl.contains("http") || downloadUrl.contains("group1") || downloadUrl.contains("group2")) { + if(downloadUrl.contains("group1") || downloadUrl.contains("group2")){ + downloadUrl = domain +downloadUrl; + } + Map pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, config.getGoFastPostUrl()); +// LOGGER.info("[QueryService] getPathSize goFaskAddr {}. resultMap {}.", config.getGoFastPostUrl(), pathSizeMap); + // Map pathSizeMap = DownLoadFile.downloadAndSaveFile(downloadUrl, "http://172.18.1.113:8080/upload"); if (pathSizeMap.size() > 0) { - if (downloadType == 1) { // 视频 + if (downloadType == 2) { // 视频 resolution = data.getString(ESConstants.RESOLUTION); videoTime = data.getString(ESConstants.VIDEOTIME); } - if (downloadType == 2) { // 图片 - resolution = DownLoadFile.imagesize(downloadUrl); + if (downloadType == 1) { // 图片 + resolution = DownLoadFile.getImageResolution(downloadUrl); } - //String url = pathSizeMap.get("realUrl").replace(config.getGoFastDomain(),""); - String url = pathSizeMap.get("realUrl").replace("http://172.18.1.113:8080", ""); - String size = pathSizeMap.get("size") + "KB"; - pathSizeMap.put(ESConstants.URL, url); - pathSizeMap.put(ESConstants.SIZE, size); pathSizeMap.put(ESConstants.RESOLUTION, resolution); pathSizeMap.put(ESConstants.VIDEOTIME, videoTime); - pathSizeMap.remove("realUrl"); // 这个是三个PathSize imagePathSize ,videoPathSize filePathSize pathSizeList.add(pathSizeMap); // 这个是 用来做 gofast 和原链接替换的,key 是原链接,value 是go-fast 链接, - srcMap.put(downloadUrl, url); + String goFastUrl = pathSizeMap.get(ESConstants.URL); + srcMap.put(ESConstants.GOFASTURL,goFastUrl); // 这个值使用来替换 三个 Path 的 imagePath,videoPath,filePath - path.add(url); + path.add(goFastUrl); + srcList.add(srcMap); + } + + }else{ // 如果 path 中的 url 是 OK的,但是不确定 pathSize 和 srcPath 的时候,需要做下面的处理 + // 因为 srcPath 中需要先添加下面两个字段值,因此需要先获取。 + String allDownloadUrl = domain + downloadUrl; + //String allDownloadUrl = "http://172.18.1.113:8080"+downloadUrl; + + if (downloadType == 2) { // 视频 + resolution = data.getString(ESConstants.RESOLUTION); + videoTime = data.getString(ESConstants.VIDEOTIME); } + if(downloadType == 1) { + resolution = DownLoadFile.getImageResolution(allDownloadUrl); + } + String size =DownLoadFile.getFileSize(allDownloadUrl); + // 组装 【pathSize】 字段 + Map pathSizeMap = new HashMap<>(); + pathSizeMap.put(ESConstants.URL,downloadUrl); + pathSizeMap.put(ESConstants.SIZE,size); + pathSizeMap.put(ESConstants.RESOLUTION, resolution); + pathSizeMap.put(ESConstants.VIDEOTIME, videoTime); + // 组装 【path】字段 + path.add(downloadUrl); + // 组装 【srcPath】字段 + srcMap.put(ESConstants.GOFASTURL,downloadUrl); + srcList.add(srcMap); + } + // 再这块组装出完整的 Path:List,PathSize List>,srcPath List> + pathMap.put(ESConstants.PATHSIZELIST, pathSizeList); + if(srcxxxList.size() > 0){ + pathMap.put(ESConstants.SRCLIST, srcxxxList); + }else { + pathMap.put(ESConstants.SRCLIST, srcList); } + pathMap.put(ESConstants.PATH, path); } } catch (IOException e) { e.printStackTrace(); } } - pathMap.put(ESConstants.PATHSIZELIST,pathSizeList); - pathMap.put("srcMap",srcMap); - pathMap.put(ESConstants.PATH,path); + return pathMap; - //return pathSizeList; + } private QueryBuilder getQueryBuilder(Long startTime, Long endTime, @@ -533,28 +621,23 @@ public class QueryService { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); try { // 当拉取次数大于1的,还需要限制 采集时间,采集时间不早于今天。 - boolean boo = true; -// if(cacheNum > 1 ){ -// QueryBuilder pubTimeRange = buildRangeQueryBuilder( -// ESConstants.CREATETIME, startTime - 2 * ONE_MINUTE, endTime, boo, boo); -// boolQueryBuilder.must(pubTimeRange); -// } -// // 筛选发表时间 + + // 筛选发表时间 电商的主贴没有发表时间,因此不需要筛选 if(!siteType.equals(ESConstants.DOCTYPEITEM)) { QueryBuilder pubTimeRange = buildRangeQueryBuilder( - ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime, boo, boo); + ESConstants.PUBTIME, startTime - 2 * ONE_MINUTE, endTime); boolQueryBuilder.must(pubTimeRange); } - // 筛选站点 + // 筛选站点 因为天猫、淘宝的数据是交叉的,因此如果需要拉某一个站点的时候需将两个站点的数据都拉出来。 if(cid.equals(ESConstants.TAOBAO) || cid.equals(ESConstants.TMALL)){ boolQueryBuilder.must(QueryBuilders.termsQuery(ESConstants.EN_SOURCE, ESConstants.TAOBAO,ESConstants.TMALL)); }else { boolQueryBuilder.must(QueryBuilders.termQuery(ESConstants.EN_SOURCE, cid)); } - // 筛选标志位 + // 筛选标志位,即采集任务的条件进行筛选 QueryBuilder crawlDataFlagBuilder = buildCrawlDataFlagBuilder(cid,crawlDataFlag); - //System.out.println("== "+crawlDataFlagBuilder); + boolQueryBuilder.must(crawlDataFlagBuilder); }catch (Exception e){ e.printStackTrace(); @@ -562,33 +645,39 @@ public class QueryService { return boolQueryBuilder; } - private QueryBuilder buildRangeQueryBuilder(String field, Object startVal, Object endVal, Boolean isIncludeLower, Boolean isIncludeUpper) { + /** + * 范围查询的语句组装 + */ + private QueryBuilder buildRangeQueryBuilder(String field, Object startVal, Object endVal) { + boolean boo = true; return rangeQuery(field) .from(startVal) .to(endVal) - .includeLower(isIncludeLower) - .includeUpper(isIncludeUpper); + .includeLower(boo) + .includeUpper(boo); } private QueryBuilder buildCrawlDataFlagBuilder(String cid,String crawlDataFlag) { QueryBuilder queryBuilder = null; try { - // TermQueryBuilder queryCidBuilders = QueryBuilders.termQuery("enSource",cid); + // 直接匹配有同样 crawlDataFlag 的数据进行拉取。 TermQueryBuilder queryCrawlDataFlagBuilder = QueryBuilders.termQuery(ESConstants.CRAWLDATAFLAG,crawlDataFlag); + // 如果是关键词任务,就不拉电商的历史数据拉,匹配出来的东西乱起八糟的!!但是别的网站还是需要用关键词进行拉取的。 if (crawlDataFlag.contains("keyword:")) { String keyword = crawlDataFlag.split("keyword:")[1]; System.out.println("[buildCrawlDataFlagBuilder] keyword --- " + keyword); // 关键词的话需要去 title 和 content 匹配一下 MatchPhraseQueryBuilder titleQuery = QueryBuilders.matchPhraseQuery(ESConstants.TITLE, keyword).slop(0); BoolQueryBuilder titleFilterQuery = QueryBuilders.boolQuery().filter(titleQuery); - // 关键词匹配的时候,如果是电商的,就只匹配商品名称。如果是其他的,就匹配 标题和内容 - if(cid.equals("taobao") || cid.equals("tmall") || cid.equals("ejingdong") || cid.equals("suning")){ - queryBuilder = QueryBuilders.boolQuery().should(titleFilterQuery).should(queryCrawlDataFlagBuilder); - }else { - MatchPhraseQueryBuilder contentQuery = QueryBuilders.matchPhraseQuery(ESConstants.CONTENT, keyword).slop(0); - BoolQueryBuilder contentFilterQuery = QueryBuilders.boolQuery().filter(contentQuery); - queryBuilder = QueryBuilders.boolQuery().should(titleFilterQuery).should(contentFilterQuery).should(queryCrawlDataFlagBuilder); - } +// // 关键词匹配的时候,如果是电商的,就只匹配商品名称。如果是其他的,就匹配 标题和内容 +// if(cid.equals("taobao") || cid.equals("tmall") || cid.equals("ejingdong") || cid.equals("suning")){ +// queryBuilder = QueryBuilders.boolQuery().should(titleFilterQuery).should(queryCrawlDataFlagBuilder); +// }else { + MatchPhraseQueryBuilder contentQuery = QueryBuilders.matchPhraseQuery(ESConstants.CONTENT, keyword).slop(0); + BoolQueryBuilder contentFilterQuery = QueryBuilders.boolQuery().filter(contentQuery); + queryBuilder = QueryBuilders.boolQuery().should(titleFilterQuery).should(contentFilterQuery).should(queryCrawlDataFlagBuilder); + // } } + if (crawlDataFlag.contains("url:")) { String url = crawlDataFlag.split("url:")[1]; System.out.println("[buildCrawlDataFlagBuilder] url --- " + url); // url 的话直接匹配 url 字段 @@ -596,12 +685,14 @@ public class QueryService { // QueryBuilder queryBuilder1 = QueryBuilders.boolQuery().must(queryUrlBuilders); queryBuilder = QueryBuilders.boolQuery().should(queryCrawlDataFlagBuilder).should(queryUrlBuilders); } + if (crawlDataFlag.contains("account:")) { String account = crawlDataFlag.split("account:")[1]; System.out.println("[buildCrawlDataFlagBuilder] account --- " + account); TermQueryBuilder queryAccountBuilders = QueryBuilders.termQuery(ESConstants.USER_URL,account); queryBuilder = QueryBuilders.boolQuery().should(queryAccountBuilders).should(queryCrawlDataFlagBuilder); } + }catch (Exception e){ e.printStackTrace(); } @@ -1506,6 +1597,31 @@ public class QueryService { //// return params; //// } +// private Map getSrcUrlMap(String picture) { +// Map srcurlMap = new HashMap<>(); +// srcurlMap.put(ESConstants.GOFASTURL,"" ); +// srcurlMap.put(ESConstants.ORIGINALURL, picture.toString()); +// return srcurlMap; +// } +// +// +// private List> getSrcPath(List> forwardUrl, Map srcAndGofastUrlMap) { +// List> srcPathList = new ArrayList<>(); +// for (Map urlMap : forwardUrl) { +// if(null != urlMap) { +// Map srcurlMap = new HashMap<>(); +// if (urlMap.containsKey(ESConstants.GOFASTURL) && null != urlMap.get(ESConstants.GOFASTURL)) { +// srcurlMap.put(ESConstants.GOFASTURL, srcAndGofastUrlMap.get(urlMap.get(ESConstants.GOFASTURL))); +// } else { +// srcurlMap.put(ESConstants.GOFASTURL, ""); +// } +// srcurlMap.put(ESConstants.ORIGINALURL, urlMap.get(ESConstants.ORIGINALURL)); +// srcPathList.add(srcurlMap); +// } +// } +// return srcPathList; +// } + // public static void main(String[] args) { // long current=System.currentTimeMillis();//当前时间毫秒数 @@ -1515,4 +1631,33 @@ public class QueryService { // long startTime = 1605715200000L; // System.out.println(startTime - 2 * ONE_MINUTE); // } + + + + +// public static void main(String[] args) { +// QueryService q = new QueryService(); +// String list = ReadLine.readFile("E:\\work\\workspace\\elastiManager\\elastiManager\\data/test.txt"); +// JSONObject j = q.downloadAndChangePath(JSONObject.parseObject(list)); +// System.out.println(j); +//// String path = "https://si.pdeepmatrix.com/group3/default/20210831/05/24/2/31_file"; +//// double size; +//// URL url = null; +//// try { +//// url = new URL(path); +//// URLConnection conn = url.openConnection(); +//// size = conn.getContentLength(); +//// if (size < 0) +//// System.out.println("无法获取文件大小。"); +//// else +//// System.out.println("文件大小为:" + size/1024 + " bytes"); +//// conn.getInputStream().close(); +//// } catch (Exception e) { +//// e.printStackTrace(); +//// } +// +// } + + + } diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/SaveService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/SaveService.java index 0deac05..1114833 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/SaveService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/query/SaveService.java @@ -7,6 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import java.util.ArrayList; + @Service public class SaveService { private static final Logger LOGGER = LoggerFactory.getLogger(SaveService.class); @@ -15,8 +17,26 @@ public class SaveService { // 初始化自定义字段 data.put(ESConstants.TASKID, taskId); data.put("where","backtrace"); +// data.put("tag",""); +// data.put("mentionAccountUrl",new ArrayList<>()); +// data.put("mentionAccount",new ArrayList<>()); +// data.put("dns",""); +// data.put("asrText",""); +// data.put("ocrText",new ArrayList<>()); +// data.put("hasOCR",0); +// data.put("hasASR",0); +// data.put("asrLength",0); +// data.put("ocrLength",0); +// data.put("hasTrans",0); +// data.put("translateTitleLength",""); +// data.put("translateContentLength",""); +// data.put("goodrate",0); +// data.put("generalrate",0); +// data.put("poorrate",0); } + + public void saveToEsWithFilter(String miniCluster, String miniIndex, final JSONObject data) { // 过滤 try { diff --git a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java index 5484eb4..ae448b2 100644 --- a/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java +++ b/cl_query_data_job/src/main/java/com/bfd/mf/job/service/statistics/StatisticsService.java @@ -270,7 +270,7 @@ public class StatisticsService { textCount = countMap.get(ESConstants.TEXTCOUNT); } - taskRepository.updateTaskCount(taskId,totalCount,todayCount); + // taskRepository.updateTaskCount(taskId,totalCount,todayCount); taskRepository.updateTaskCountAll(taskId,totalCount,todayCount,imageCount,videoCount,fileCount,textCount); } diff --git a/cl_query_data_job/src/main/resources/application.yml b/cl_query_data_job/src/main/resources/application.yml index e3c8ab2..4f60bd3 100644 --- a/cl_query_data_job/src/main/resources/application.yml +++ b/cl_query_data_job/src/main/resources/application.yml @@ -15,22 +15,12 @@ logging: spring: datasource: driver-class-name: com.mysql.jdbc.Driver - username: root - password: Bfd123!@# - url: jdbc:mysql://172.18.1.134:3306/intelligent_crawl?useOldAliasMetadataBehavior=true&characterEncoding=UTF-8&zeroDateTimeBehavior=round + username: crawl + password: D5HLOvk553DUNV62qJI= + url: jdbc:mysql://172.18.1.134:3306/all_task?useOldAliasMetadataBehavior=true&characterEncoding=UTF-8&zeroDateTimeBehavior=round hikari: maximum-pool-size: 10 minimum-idle: 1 -#spring: -# datasource: -# driver-class-name: com.mysql.jdbc.Driver -# username: root -# password: Bfd123!@# -# url: jdbc:mysql://172.18.1.134:3306/all_task?useOldAliasMetadataBehavior=true&characterEncoding=UTF-8&zeroDateTimeBehavior=round -# hikari: -# maximum-pool-size: 10 -# minimum-idle: 1 - worker: version: 3.0.1 @@ -48,13 +38,13 @@ worker: enable-analysis-producer: false # 查ES写kafka enable-analysis-consumer: false # 读kafka写ES enable-statistics-producer: false # 统计 taskCount 和 subjectCount (采集平台) - enable-query-producer: false # 离线拉数(采集平台) + enable-query-producer: true # 离线拉数(采集平台) enable-backtrace-producer: false # 欧莱雅查数(采集平台,欧莱雅项目独用) enable-rw-oly-producer: false # 欧莱雅数据导出,暂时不用 enable-up-load-producer: false # 上传(采集平台) enable-output-producer: false #未开发,暂留 enable-taskcount-producer: false # 任务数量的统计,任务量和任务平均时长(运营后台) - enable-alarm-producer: true # 报警,查ES统计报警发邮件写数据库(运营后台) + enable-alarm-producer: false # 报警,查ES统计报警发邮件写数据库(运营后台) ## 启动服务的线程数 statistics-producer-thread-count: 1 query-producer-thread-count: 10 diff --git a/cl_search_api/src/main/java/com/bfd/mf/common/service/cache/TopicQueryService.java b/cl_search_api/src/main/java/com/bfd/mf/common/service/cache/TopicQueryService.java index f85e4c6..fb6038f 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/common/service/cache/TopicQueryService.java +++ b/cl_search_api/src/main/java/com/bfd/mf/common/service/cache/TopicQueryService.java @@ -56,15 +56,15 @@ public class TopicQueryService { QueryBuilder categoryLableQuery = QueryBuilders.matchPhraseQuery(ESConstant.CATEGORYLABEL,queryRequest.getCategoryLabel()); boolQuery.must(categoryLableQuery); } - } + // searchArea 地点 if (("").equals(queryRequest.getSearchArea())) { logger.info("[TopicQueryService] queryByConditions_v1 查询全部地区"); } else { List areaList = siteRepository.findCidsByArea(queryRequest.getSearchArea()); List lowCaseAreaList = areaList.stream().map(String::toLowerCase).collect(Collectors.toList()); - boolQuery.must(QueryBuilders.termsQuery(ESConstant.EN_SOURCE,lowCaseAreaList)); + boolQuery.must(QueryBuilders.termsQuery(ESConstant.EN_SOURCE+".keyword",lowCaseAreaList)); // String searchArea = getSearchArea(queryRequest.getSearchArea()); // boolQuery.must(QueryBuilders.termQuery(ESConstant.AREA, searchArea)); } @@ -96,10 +96,10 @@ public class TopicQueryService { logger.info("[TopicQueryService] queryByConditions_v1 查询全部站点"); } else { if(cid.equals("Tmall") || cid.equals("Taobao")){ - boolQuery.should(QueryBuilders.termQuery(ESConstant.EN_SOURCE, "tmall")); - boolQuery.should(QueryBuilders.termQuery(ESConstant.EN_SOURCE, "taobao")); + boolQuery.must(QueryBuilders.termsQuery(ESConstant.EN_SOURCE+".keyword", "tmall","taobao")); + // boolQuery.should(QueryBuilders.termQuery(ESConstant.EN_SOURCE+".keyword", "taobao")); }else { - boolQuery.must(QueryBuilders.termQuery(ESConstant.EN_SOURCE, cid.toLowerCase())); + boolQuery.must(QueryBuilders.termQuery(ESConstant.EN_SOURCE+".keyword", cid.toLowerCase())); } } @@ -121,17 +121,21 @@ public class TopicQueryService { long crawlEndTime = queryRequest.getCrawlEndTime(); boolQuery.must(QueryBuilders.rangeQuery(ESConstant.CRAWLTIME).gte(crawlStartTime).lt(crawlEndTime)); } else { - long startTime = new Date().getTime(); + long startTime =0L; if(null != queryRequest.getStartTime() ) { startTime = queryRequest.getStartTime(); } - long endTime = new Date().getTime(); + long endTime = 0L; if(null != queryRequest.getEndTime()) { endTime = queryRequest.getEndTime(); } if(startTime == 0 && endTime == 0){ logger.info("没有时间筛选,查询全部专题下的数据!"); - }else { + }else if(startTime == 0){ // 如果开始时间为空,那就查询小于 endTime 的所有数据 + boolQuery.must(QueryBuilders.rangeQuery(ESConstant.PUBTIME).lt(endTime)); + }else if (endTime == 0){ // 如果结束时间为空,加就查大于 startTime 的所有数据 + boolQuery.must(QueryBuilders.rangeQuery(ESConstant.PUBTIME).gte(startTime)); + }else{ boolQuery.must(QueryBuilders.rangeQuery(ESConstant.PUBTIME).gte(startTime).lt(endTime)); } } diff --git a/cl_search_api/src/main/java/com/bfd/mf/common/util/slice/SliceScrollUtil.java b/cl_search_api/src/main/java/com/bfd/mf/common/util/slice/SliceScrollUtil.java index 57a4312..ddcf5b2 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/common/util/slice/SliceScrollUtil.java +++ b/cl_search_api/src/main/java/com/bfd/mf/common/util/slice/SliceScrollUtil.java @@ -142,8 +142,6 @@ public class SliceScrollUtil { logger.info("==========进入数据分析Es and Cache,计算开始执行============"); String sortFlag = ""; String orderFlag = "desc"; -// Integer size = queryRequest.getLimit(); -// Integer from = queryRequest.getPage()*size; try { List responseList = new ArrayList<>(); List searchResponseEsList =subjectQueryDataService.fetchResponseDataFromCache( diff --git a/cl_search_api/src/main/java/com/bfd/mf/service/SearchAnalysisService.java b/cl_search_api/src/main/java/com/bfd/mf/service/SearchAnalysisService.java index e63b08e..005b44c 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/service/SearchAnalysisService.java +++ b/cl_search_api/src/main/java/com/bfd/mf/service/SearchAnalysisService.java @@ -40,8 +40,7 @@ public class SearchAnalysisService { public JSONObject getAnalysisResponse(QueryRequest queryRequest) { JSONObject jsonObject = new JSONObject(); try{ - List esMonitorEntity = sliceScrollUtil. - fetchResultSubjectCache(queryRequest, ESConstant.FIELD_LIST_ANALYSIS); + List esMonitorEntity = sliceScrollUtil.fetchResultSubjectCache(queryRequest, ESConstant.FIELD_LIST_ANALYSIS); // 渠道走势 jsonObject = dataAnalysisTrendByDayQueryTimes(queryRequest,esMonitorEntity); // 获取 渠道统计结果 分类标签统计结果 价值标签统计结果 @@ -185,14 +184,28 @@ public class SearchAnalysisService { Long startTime = queryRequest.getStartTime(); Long endTime = queryRequest.getEndTime(); Long time_difference = 0L; - if(startTime == 0 && endTime == 0){ // 说明查的是专题,需要整个时间范围内的数据哦 + + if(null == startTime) { + startTime = 0L; + } + if (null == endTime){ // 当只选择了开始时间,没有结束时间的时候,默认结束时间是当前时间 + endTime = 0L; //// 当只选择了结束时间,没有开始时间的时候,开始时间应该是 + } + if( startTime == 0 || endTime == 0){ // 说明查的是专题,需要整个时间范围内的数据哦 // 一天是 86400 ,如果时间差 < 86400000 就拆小时,如果时间差 >86400000 < 2678400000 拆天, 如果时间差 > 2678400000 就拆月 time_difference = (esMonitorEntityList.get(esMonitorEntityList.size() - 1).getPubTime()- esMonitorEntityList.get(0).getPubTime())/1000; //System.out.println("时间差:" + (esMonitorEntityList.get(esMonitorEntityList.size() - 1).getPubTime()- esMonitorEntityList.get(0).getPubTime())); System.out.println("数据条数 : "+esMonitorEntityList.size() + "| " + "时间范围:(开始时间:"+esMonitorEntityList.get(0).getPubTime() + "| "+"结束时间:"+esMonitorEntityList.get(esMonitorEntityList.size() - 1).getPubTime() + ")"); - startTime = esMonitorEntityList.get(0).getPubTime(); - endTime = esMonitorEntityList.get(esMonitorEntityList.size() - 1).getPubTime(); + if(startTime == 0) { + startTime = esMonitorEntityList.get(0).getPubTime(); + } + if(endTime == 0 ) { + endTime = esMonitorEntityList.get(esMonitorEntityList.size() - 1).getPubTime(); + } } +// else if (null == startTime){ +// startTime = System.currentTimeMillis(); +// } // System.out.println("时间差: " + time_difference); // System.out.println("一年: " + ONEYEAR); // Map> dayChannelMaps = new HashMap<>(); diff --git a/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java b/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java index eb4a63d..4b72dff 100644 --- a/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java +++ b/cl_search_api/src/main/java/com/bfd/mf/service/SearchDataService.java @@ -410,7 +410,6 @@ public class SearchDataService extends CrudService>imagePathSize = (List>) jsonObject.get(ESConstant.IMAGEPATHSIZE); for (Map imagePath: imagePathSize) { String url = imagePath.get(ESConstant.URL); - url = url.replace(bfdApiConfig.getGoFastDomain(),""); + url = url.replace(bfdApiConfig.getGoFastDomain(),"").replace("http://172.18.1.113:8892",""); imagePath.put(ESConstant.URL,url); } } @@ -638,7 +636,7 @@ public class SearchDataService extends CrudService>videoPathSize = (List>) jsonObject.get(ESConstant.VIDEOPATHSIZE); for (Map videoPath: videoPathSize) { String url = videoPath.get(ESConstant.URL); - url = url.replace(bfdApiConfig.getGoFastDomain(),""); + url = url.replace(bfdApiConfig.getGoFastDomain(),"").replace("http://172.18.1.113:8892",""); videoPath.put(ESConstant.URL,url); } } @@ -646,7 +644,7 @@ public class SearchDataService extends CrudService>filePathSize = (List>) jsonObject.get(ESConstant.FILEPATHSIZE); for (Map filePath: filePathSize) { String url = filePath.get(ESConstant.URL); - url = url.replace(bfdApiConfig.getGoFastDomain(),""); + url = url.replace(bfdApiConfig.getGoFastDomain(),"").replace("http://172.18.1.113:8892",""); filePath.put(ESConstant.URL,url); } } diff --git a/cl_search_api/src/main/resources/application.yml b/cl_search_api/src/main/resources/application.yml index 8da7d61..88c0d47 100644 --- a/cl_search_api/src/main/resources/application.yml +++ b/cl_search_api/src/main/resources/application.yml @@ -15,10 +15,12 @@ server: spring: datasource: driver-class-name: com.mysql.jdbc.Driver - username: root - password: Bfd123!@# +# username: root +# password: Bfd123!@# +# url: jdbc:mysql://172.18.1.134:3306/intelligent_crawl?useOldAliasMetadataBehavior=true&characterEncoding=UTF-8&zeroDateTimeBehavior=round + username: crawl + password: D5HLOvk553DUNV62qJI= url: jdbc:mysql://172.18.1.134:3306/intelligent_crawl?useOldAliasMetadataBehavior=true&characterEncoding=UTF-8&zeroDateTimeBehavior=round - hikari: maximum-pool-size: 10 minimum-idle: 1