Browse Source

采集平台任务缓存

sitask
zhicheng.zhang 4 weeks ago
parent
commit
baa63fad8c
  1. 2
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java
  2. 12
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java

2
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java

@ -14,7 +14,7 @@ public class DataProcess implements Runnable {
String key=a.split("@#@")[0];
String value=a.split("@#@")[1];
RedisUtil.set(key, value, 9);
RedisUtil.set(key, value, 10);
System.out.println(Constants.getLineQueue().size()+"队列的大小"+key);
log.info(Constants.getLineQueue().size()+"队列的大小"+key);
// WriteMethod.writeMethod("1.txt", a);

12
cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java

@ -195,12 +195,12 @@ public class SubjectTask implements Runnable {
System.out.println(updatetime+now);
long starttime=new Date().getTime()/1000-20L;
if (disposeCrawldataflag("Tasktimelimiit")){
starttime= Long.parseLong(RedisUtil.get("Tasktimelimiit",9));
starttime= Long.parseLong(RedisUtil.get("Tasktimelimiit",10));
starttime=starttime-5;
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9);
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 10);
}else {
System.out.println("第一次写入");
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9);
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 10);
}
System.out.println(System.currentTimeMillis()+"开始mysql执行的时间");
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.crawl_start_time,ct.crawl_end_time,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and ct.app_id!='bw01' and unix_timestamp(ct.start_time)>'"+starttime+"';");
@ -342,7 +342,7 @@ public class SubjectTask implements Runnable {
// valueList.add(value);
// }
if(disposeCrawldataflag(newkey)) { //判断redis中的key
String getsubjectList=RedisUtil.get(newkey,9);
String getsubjectList=RedisUtil.get(newkey,10);
try {
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList);
List<Map<String, String>> redistList=new ArrayList<>();
@ -455,8 +455,8 @@ public class SubjectTask implements Runnable {
private boolean disposeCrawldataflag(String crawldataflag) {
try{
if (RedisUtil.exists(crawldataflag, 9)) { // 先去 redis中查询是否存在不存直接忽略
String value = RedisUtil.get(crawldataflag,9);
if (RedisUtil.exists(crawldataflag, 10)) { // 先去 redis中查询是否存在不存直接忽略
String value = RedisUtil.get(crawldataflag,10);
if(null != value && !("").equals(value)) {
return true;

Loading…
Cancel
Save