|
@ -12,10 +12,7 @@ import com.bfd.mf.job.domain.entity.Task; |
|
|
import com.bfd.mf.job.domain.repository.SubjectRepository; |
|
|
import com.bfd.mf.job.domain.repository.SubjectRepository; |
|
|
import com.bfd.mf.job.domain.repository.TaskRepository; |
|
|
import com.bfd.mf.job.domain.repository.TaskRepository; |
|
|
import com.bfd.mf.job.download.DownLoadFile; |
|
|
import com.bfd.mf.job.download.DownLoadFile; |
|
|
import com.bfd.mf.job.util.DataCheckUtil; |
|
|
|
|
|
import com.bfd.mf.job.util.EsUtils; |
|
|
|
|
|
import com.bfd.mf.job.util.Kafka010Utils; |
|
|
|
|
|
import com.bfd.mf.job.util.ReadLine; |
|
|
|
|
|
|
|
|
import com.bfd.mf.job.util.*; |
|
|
import com.google.common.collect.Maps; |
|
|
import com.google.common.collect.Maps; |
|
|
import com.google.common.util.concurrent.RateLimiter; |
|
|
import com.google.common.util.concurrent.RateLimiter; |
|
|
import kafka.utils.Json; |
|
|
import kafka.utils.Json; |
|
@ -106,8 +103,14 @@ public class QueryService { |
|
|
long taskId = task.getId().longValue(); |
|
|
long taskId = task.getId().longValue(); |
|
|
String appId = task.getAppId(); |
|
|
String appId = task.getAppId(); |
|
|
int cache_num = 1; |
|
|
int cache_num = 1; |
|
|
taskRepository.updateStatus(cache_num, task.getId().longValue()); |
|
|
|
|
|
cache.put(taskId+"#@#"+appId, Lists.newArrayList(0L, 0L, progressFactor, totalSegment, segment)); |
|
|
|
|
|
|
|
|
Integer siteType = task.getSiteType(); |
|
|
|
|
|
if(siteType == 5){ |
|
|
|
|
|
cache_num = 2; |
|
|
|
|
|
taskRepository.updateStatus(cache_num, task.getId().longValue()); |
|
|
|
|
|
} else { |
|
|
|
|
|
taskRepository.updateStatus(cache_num, task.getId().longValue()); |
|
|
|
|
|
cache.put(taskId + "#@#" + appId, Lists.newArrayList(0L, 0L, progressFactor, totalSegment, segment)); |
|
|
|
|
|
} |
|
|
try { |
|
|
try { |
|
|
P_TASK_CACHE_RANGE.put(cache); |
|
|
P_TASK_CACHE_RANGE.put(cache); |
|
|
} catch (InterruptedException e) { |
|
|
} catch (InterruptedException e) { |
|
@ -125,8 +128,8 @@ public class QueryService { |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
String taskIdAppId = ""; |
|
|
String taskIdAppId = ""; |
|
|
long fromMills =0L; |
|
|
|
|
|
long toMills = 0L; |
|
|
|
|
|
|
|
|
long fromMills = 0L; //1604419200000 |
|
|
|
|
|
long toMills = 0L; // 1604505600000 |
|
|
for (Map.Entry<String, List<? extends Number>> entry : range.entrySet()) { |
|
|
for (Map.Entry<String, List<? extends Number>> entry : range.entrySet()) { |
|
|
entry.getValue(); |
|
|
entry.getValue(); |
|
|
taskIdAppId = entry.getKey(); |
|
|
taskIdAppId = entry.getKey(); |
|
@ -182,13 +185,13 @@ public class QueryService { |
|
|
fromMills = task.getCrawlStartTime().longValue(); |
|
|
fromMills = task.getCrawlStartTime().longValue(); |
|
|
queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); |
|
|
queryBuilder = getQueryBuilder(fromMills, toMills, cid, crawlDataFlag, cacheNum, siteType); |
|
|
} |
|
|
} |
|
|
// LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", |
|
|
|
|
|
// taskId, |
|
|
|
|
|
// indexName, |
|
|
|
|
|
// new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
// new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
// JSONObject.toJSONString(sourceIndices), |
|
|
|
|
|
// queryBuilder.toString()); |
|
|
|
|
|
|
|
|
LOGGER.info("Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}.", |
|
|
|
|
|
taskId, |
|
|
|
|
|
indexName, |
|
|
|
|
|
new LocalDateTime(fromMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
new LocalDateTime(toMills).toString(AppConfig.DATE_TIME_FORMAT), |
|
|
|
|
|
JSONObject.toJSONString(sourceIndices), |
|
|
|
|
|
queryBuilder.toString()); |
|
|
// 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 |
|
|
// 传入的参数 集群名称,索引名称,索引类型(type), 查询Builder,scroll查询页面大小,scroll查询scrollId有效时间 |
|
|
String finalTaskId = taskId + ""; |
|
|
String finalTaskId = taskId + ""; |
|
|
long pubTime = fromMills; |
|
|
long pubTime = fromMills; |
|
@ -223,6 +226,10 @@ public class QueryService { |
|
|
if (!data.get("_id_").equals("")) { |
|
|
if (!data.get("_id_").equals("")) { |
|
|
saveService.saveToEsWithFilter(config.esMiniClusterName(), finalIndexName1, data); |
|
|
saveService.saveToEsWithFilter(config.esMiniClusterName(), finalIndexName1, data); |
|
|
kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); |
|
|
kafkaProducer.send(config.getSendTopic(),JSONObject.toJSONString(data)); |
|
|
|
|
|
// long crawlTime = data.getLong("crawlTime"); |
|
|
|
|
|
// if(crawlTime < 1633795200000L){ |
|
|
|
|
|
// WriteMethod.writeMethod("../../../error.txt",JSONObject.toJSONString(data)); |
|
|
|
|
|
// } |
|
|
LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", finalIndexName, task.getId(), data.getString("_id_")); |
|
|
LOGGER.debug("Send message, indexName :{} , taskId:{} , ID :{}.", finalIndexName, task.getId(), data.getString("_id_")); |
|
|
// 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! |
|
|
// 将要拉评论的ID 添加到list 中,(电商的数据不用拉评论哦)! |
|
|
if (!siteType.equals(ESConstants.DOCTYPEITEM)) { |
|
|
if (!siteType.equals(ESConstants.DOCTYPEITEM)) { |
|
|