60 changed files with 1545 additions and 0 deletions
-
115cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/Newscontent.java
-
4cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/test.java
-
79cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenfastTaskManager.java
-
480cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTaskdele.java
-
471cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTaskdelone.java
-
36cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/si.java
-
172cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/taskdel.java
-
31cl_stream_datasave/src/main/java/com/bfd/mf/runstart/DelRunStartDataSave.java
-
36cl_stream_datasave/src/main/resources/META-INF/MANIFEST.MF
-
BINcl_stream_datasave/target/cl_stream_datasave-2.0-SNAPSHOT.jar
-
BINcl_stream_datasave/target/cl_stream_datasave-2.0-SNAPSHOT.jar.original
-
36cl_stream_datasave/target/classes/META-INF/MANIFEST.MF
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/DownLoadFile.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/MyCookieJar.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/NewsDownload.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/Newscontent.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/OkHttpUtils.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/ProxyIp.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/download/test.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/kafka/IKafka.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/kafka/ReadKafka.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/listen/DataSaveManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/listen/ListenKafkaManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/listen/ListenTaskManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/listen/ListenfastTaskManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/Constants.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DBConnectionManager$DBConnectionPool.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DBConnectionManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DBUtil.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DataCheckUtil.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DataProcess.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/DateUtil.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/PropertiesUtil.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/ReadLine.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/datasave/tools/WriteMethod.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/AllKeys.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/DataSaveManager.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/FieldInfo.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/FieldNormaliz.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/ReaultInfo.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/impl/DataSaveManagerImpl.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/FiledTableInfo.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/SubjectTask.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/SubjectTaskdele.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/SubjectTaskdelone.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/Tasklimit.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/Userlimit.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/cl_task.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/si.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/entity/mysql/taskdel.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/runstart/DelRunStartDataSave.class
-
BINcl_stream_datasave/target/classes/com/bfd/mf/runstart/RunStartDataSave.class
-
BINcl_stream_datasave/target/gxcl_stream_datasave-2.0-SNAPSHOT.jar
-
5cl_stream_datasave/target/maven-archiver/pom.properties
-
36cl_stream_datasave/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
-
39cl_stream_datasave/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
-
2cl_stream_datasave/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst
-
3cl_stream_datasave/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst
-
BINcl_stream_datasave/target/test-classes/CreatIndex.class
-
BINcl_stream_datasave/target/test-classes/Test1.class
@ -0,0 +1,115 @@ |
|||
package com.bfd.mf.datasave.download; |
|||
|
|||
import com.bfd.crawler.utils.JsonUtils; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
//videoPath == egc |
|||
//filePath == ugc |
|||
//imagePath == pgc |
|||
|
|||
public class Newscontent { |
|||
public static void downloadAndSaveimage(Map<String, Object> resultMap, List<Map<String, String>> imagePathSizevalue){ |
|||
// = (List<Map<String, String>>) resultMap.get("imagePathSize"); |
|||
List<String> imagePathlist=new ArrayList<>(); |
|||
for (Map<String, String> mapurl:imagePathSizevalue){ |
|||
if(mapurl.containsKey("url")){ |
|||
imagePathlist.add(mapurl.get("url").toString()); |
|||
} |
|||
} |
|||
resultMap.put("imagePathSize",JsonUtils.toJSONString(imagePathSizevalue)); |
|||
resultMap.put("imagePath",imagePathlist); |
|||
resultMap.put("pgc",1); |
|||
List<Map<String, Object>> picturepath=new ArrayList<>(); |
|||
if(resultMap.get("pictureList")!=""){ |
|||
try { |
|||
Map<String,Object> map=JsonUtils.parseObject((String) resultMap.get("pictureList")); |
|||
if(!map.isEmpty()){ |
|||
for (Map.Entry<String, Object> entry : map.entrySet()) { |
|||
Map<String,Object> gofastmap=new HashMap<>(); |
|||
Map<String,Object> revmap= (Map<String, Object>) entry.getValue(); |
|||
if(revmap.containsKey("uploadImg")&&revmap.get("uploadImg")!=null&&revmap.get("uploadImg")!=""){ |
|||
gofastmap.put("gofastUrl","/group"+revmap.get("uploadImg").toString().split("/group")[1]); |
|||
gofastmap.put("originalUrl",revmap.get("img")); |
|||
} |
|||
picturepath.add(gofastmap); |
|||
} |
|||
resultMap.put("srcimagePath",JsonUtils.toJSONString(picturepath)); |
|||
} |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
} |
|||
public static void downloadAndSavefile(Map<String, Object> resultMap, List<Map<String, Object>> filePathSize){ |
|||
//List<Map<String, String>> imagePathSizevalue= (List<Map<String, String>>) resultMap.get("filePathSize"); |
|||
List<String> imagePathlist=new ArrayList<>(); |
|||
for (Map<String, Object> mapurl:filePathSize){ |
|||
if(mapurl.containsKey("url")){ |
|||
imagePathlist.add(mapurl.get("url").toString()); |
|||
} |
|||
} |
|||
resultMap.put("filePathSize",JsonUtils.toJSONString(filePathSize)); |
|||
resultMap.put("filePath",imagePathlist); |
|||
resultMap.put("ugc",1); |
|||
if(resultMap.get("forwardUrl")!=""){ |
|||
try { |
|||
List<Map<String, Object>> forwardUrl= (List<Map<String, Object>>) JsonUtils.parseArray((String) resultMap.get("forwardUrl")); |
|||
List<Map<String, Object>> anewforwardUrl=new ArrayList<>(); |
|||
for( Map<String, Object> mapList : forwardUrl ) { |
|||
if(mapList.containsKey("gofastUrl")){ |
|||
mapList.put("gofastUrl","/group"+mapList.get("gofastUrl").toString().split("/group")[1]); |
|||
anewforwardUrl.add(mapList); |
|||
}else{ |
|||
anewforwardUrl.add(mapList); |
|||
} |
|||
} |
|||
String reforwardUrl=JsonUtils.toJSONString(anewforwardUrl); |
|||
resultMap.put("srcfilePath",reforwardUrl); |
|||
|
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
|
|||
} |
|||
} |
|||
} |
|||
public static void downloadAndSavevideo(Map<String, Object> resultMap, List<Map<String, String>> videoPathSize){ |
|||
// List<Map<String, String>> imagePathSizevalue= (List<Map<String, String>>) resultMap.get("videoPathSize"); |
|||
List<String> imagePathlist=new ArrayList<>(); |
|||
for (Map<String, String> mapurl:videoPathSize){ |
|||
if(mapurl.containsKey("url")){ |
|||
imagePathlist.add(mapurl.get("url")); |
|||
} |
|||
} |
|||
resultMap.put("videoPathSize",JsonUtils.toJSONString(videoPathSize)); |
|||
resultMap.put("videoPath",imagePathlist); |
|||
resultMap.put("egc",1); |
|||
if(resultMap.get("videoUrl")!=""){ |
|||
try { |
|||
List<Map<String, Object>> forwardUrl= (List<Map<String, Object>>) JsonUtils.parseArray((String) resultMap.get("videoUrl")); |
|||
List<Map<String, Object>> anewforwardUrl=new ArrayList<>(); |
|||
for( Map<String, Object> mapList : forwardUrl ) { |
|||
if(mapList.containsKey("gofastUrl")){ |
|||
mapList.put("gofastUrl","/group"+mapList.get("gofastUrl").toString().split("/group")[1]); |
|||
anewforwardUrl.add(mapList); |
|||
}else{ |
|||
anewforwardUrl.add(mapList); |
|||
} |
|||
} |
|||
String reforwardUrl=JsonUtils.toJSONString(anewforwardUrl); |
|||
resultMap.put("srcvideoPath",reforwardUrl); |
|||
|
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
|
|||
} |
|||
} |
|||
} |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
} |
@ -0,0 +1,4 @@ |
|||
package com.bfd.mf.datasave.download; |
|||
|
|||
public class test { |
|||
} |
@ -0,0 +1,79 @@ |
|||
package com.bfd.mf.datasave.listen; |
|||
|
|||
import com.bfd.mf.datasave.kafka.ReadKafka; |
|||
import com.bfd.mf.datasave.tools.DateUtil; |
|||
import com.bfd.mf.entity.FieldNormaliz; |
|||
import com.bfd.mf.entity.mysql.FiledTableInfo; |
|||
import com.bfd.mf.entity.mysql.SubjectTask; |
|||
|
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.LinkedBlockingDeque; |
|||
import java.util.concurrent.SynchronousQueue; |
|||
import java.util.concurrent.ThreadPoolExecutor; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
public class ListenfastTaskManager implements Runnable { |
|||
|
|||
//private LinkedBlockingDeque<String> queue= new LinkedBlockingDeque<String>(5000); |
|||
private LinkedBlockingDeque<String> fastqueue= new LinkedBlockingDeque<String>(5000); |
|||
private boolean isRun = true; |
|||
private FieldNormaliz fieldNormaliz; |
|||
private ThreadPoolExecutor spiderPoolExec ; |
|||
private Map<String, List<Map<String,String>>> subject; |
|||
private Map<Integer,Map<String,String>> tableInfoMap; |
|||
|
|||
public ListenfastTaskManager(FieldNormaliz fieldNormaliz){ |
|||
int croePoolsize = 20 ; |
|||
int maximumPoolsize = 500; |
|||
long keepAliveTime = 0; |
|||
this.spiderPoolExec = new ThreadPoolExecutor(croePoolsize, maximumPoolsize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); |
|||
this.fieldNormaliz = fieldNormaliz ; |
|||
this.subject = SubjectTask.subjectTaskMap; |
|||
this.tableInfoMap = FiledTableInfo.tableInfoMap; |
|||
String kafkaname = fieldNormaliz.getKafkaName()+"fast" ; |
|||
ReadKafka readfaseKafka = new ReadKafka(fastqueue , kafkaname ,12 , fieldNormaliz.getGroupId(), fieldNormaliz.getKafkaSerName(),fieldNormaliz.getEsSerName()); |
|||
readfaseKafka.read(); |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void run() { |
|||
while(isRun){ |
|||
if(this.fastqueue.size() < 1){ |
|||
DateUtil.sleep(1); |
|||
continue; |
|||
} |
|||
String data = this.fastqueue.poll(); |
|||
System.out.println(data); |
|||
if(data == null) continue ; |
|||
if (data.equals("__Exit__")) break ; |
|||
this.addTask(data); |
|||
} |
|||
} |
|||
|
|||
private void addTask(String data){ |
|||
while ( spiderPoolExec.getPoolSize() >= spiderPoolExec.getMaximumPoolSize() || |
|||
spiderPoolExec.getActiveCount() >= spiderPoolExec.getMaximumPoolSize()) { |
|||
try { |
|||
Thread.sleep(200); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
System.out.println("现有的线程数"+spiderPoolExec.getActiveCount()); |
|||
|
|||
spiderPoolExec.submit(new DataSaveManager(data, fieldNormaliz,subject,tableInfoMap)); |
|||
} |
|||
|
|||
|
|||
public void setSwitch(boolean flag){ |
|||
this.isRun = flag ; |
|||
} |
|||
|
|||
public LinkedBlockingDeque<String> getQueue(){ |
|||
return fastqueue ; |
|||
} |
|||
|
|||
|
|||
} |
@ -0,0 +1,480 @@ |
|||
package com.bfd.mf.entity.mysql; |
|||
|
|||
|
|||
//import com.bfd.crawler.utils.JsonUtils; |
|||
import com.bfd.crawler.utils.JsonUtils; |
|||
import com.bfd.mf.datasave.tools.Constants; |
|||
import com.bfd.mf.datasave.tools.DBUtil; |
|||
import com.bfd.mf.datasave.tools.DateUtil; |
|||
import crawler.open.util.RedisUtil; |
|||
import org.apache.log4j.Logger; |
|||
|
|||
import java.text.SimpleDateFormat; |
|||
import java.util.*; |
|||
|
|||
import static com.bfd.mf.entity.mysql.Tasklimit.subjectTasktimelimiit; |
|||
import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit; |
|||
|
|||
public class SubjectTaskdele implements Runnable { |
|||
private static Logger log = Logger.getLogger(SubjectTask.class); |
|||
public static Map<String, List<Map<String,String>>> subjectTaskMap = new HashMap<>(); |
|||
|
|||
// public static void loadSubjectTask() { |
|||
// subjectTaskMap.clear(); |
|||
// //List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat").query("select cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id)where (ct.crawl_status=1 or ct.crawl_status=3) and ct.del=0 ;");ct.app_id=cs.app_id and |
|||
// String time=DateUtil.getDate(); |
|||
// //System.out.println(time); |
|||
// //System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.app_id=cs.app_id and ct.cid!=\"\" and ct.update_time>'"+time+"'order by ct.update_time desc;"); |
|||
// System.out.println(subjectTaskList.size()); |
|||
// if(subjectTaskList.size() > 0){ |
|||
// String key = ""; |
|||
// for(Map<String, Object> subjectTask : subjectTaskList){ //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} |
|||
// String keytwo = ""; |
|||
// if( subjectTask.get("cid").equals("Tmall")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else if (subjectTask.get("cid").equals("Taobao")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else { |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// Map<String,String> value = new HashMap<>(); |
|||
// List<Map<String,String>> valueList = new ArrayList<>(); |
|||
// String v_subject_id = ""; |
|||
// String v_go_fast_addr = ""; |
|||
// String v_kafka_switch = ""; |
|||
// String v_kafka_addr = ""; |
|||
// String v_task_id = ""; |
|||
// String v_external_id =""; |
|||
// String v_go_fast_switch=""; |
|||
// String v_kafka_topic=""; |
|||
// String v_status=""; |
|||
// String v_del=""; |
|||
// String v_create_user_id=""; |
|||
// String v_ocr="0"; |
|||
// String v_trans="0"; |
|||
// String v_crawl_content_key=""; |
|||
// if(null != subjectTask.get("subject_id")) { |
|||
// v_subject_id = subjectTask.get("subject_id").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("crawl_content_key")) { |
|||
// v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("go_fast_addr")) { |
|||
// v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_addr")) { |
|||
// v_kafka_addr = subjectTask.get("kafka_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_switch")){ |
|||
// v_kafka_switch = subjectTask.get("kafka_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("id")){ |
|||
// v_task_id = subjectTask.get("id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("external_id")){ |
|||
// v_external_id = subjectTask.get("external_id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("go_fast_switch")){ |
|||
// v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("kafka_topic")){ |
|||
// v_kafka_topic = subjectTask.get("kafka_topic").toString(); |
|||
// } |
|||
//// if(null !=subjectTask.get("status")){ |
|||
//// v_status = subjectTask.get("status").toString(); |
|||
//// } |
|||
// if(null !=subjectTask.get("del")){ |
|||
// v_del = subjectTask.get("del").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("create_user_id")){ |
|||
// v_create_user_id = subjectTask.get("create_user_id").toString(); |
|||
// } |
|||
// value.put("subject_id",v_subject_id); |
|||
// value.put("go_fast_addr",v_go_fast_addr); |
|||
// value.put("export_to_kafka",v_kafka_switch); |
|||
// value.put("kafka_addr",v_kafka_addr); |
|||
// // value.put("task_id",v_task_id); |
|||
// value.put("external_id",v_external_id); |
|||
// value.put("go_fast_switch",v_go_fast_switch); |
|||
// value.put("kafka_topic",v_kafka_topic); |
|||
// // value.put("status",v_status);//专题的状态 |
|||
// value.put("del",v_del);//专题的状态 |
|||
// value.put("appid",subjectTask.get("app_id").toString()); |
|||
// value.put("crawl_content_key",v_crawl_content_key); |
|||
// //System.out.print(v_external_id+"external_id"); |
|||
// String newkey = key.toLowerCase(); |
|||
// String userkey=newkey+"#####"+subjectTask.get("app_id").toString().toLowerCase(); |
|||
// |
|||
// //组装时间的参数 |
|||
// if (subjectTasktimelimiit.containsKey(userkey)){ |
|||
// List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey); |
|||
// if(timelist.size()==1){ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } else{ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// String subject_id=subjectTasktime.get("subject_id").toString(); |
|||
// if (v_subject_id.equals(subject_id)){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } |
|||
// } |
|||
// |
|||
// } |
|||
//// //用户的权限 |
|||
// if (subjectuserlimiit.containsKey(v_create_user_id)){ |
|||
// Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id); |
|||
// v_ocr= permission.get("is_ocr").toString(); |
|||
// v_trans= permission.get("is_trans").toString(); |
|||
// } |
|||
// value.put("is_ocr",v_ocr); |
|||
// value.put("is_trans",v_trans); |
|||
// //组装相同任务的任务id |
|||
// if(subjectTaskMap.containsKey(newkey)){ |
|||
// valueList = subjectTaskMap.get(newkey); |
|||
// for (Map<String, String> valuetask : valueList){ |
|||
// String task=valuetask.get("task_id")+","+v_task_id; |
|||
// valuetask.put("task_id",task); |
|||
// value.put("task_id",task); |
|||
// } |
|||
// valueList.add(value); |
|||
// }else{ |
|||
// value.put("task_id",v_task_id); |
|||
// valueList.add(value); |
|||
// } |
|||
// |
|||
// if(keytwo.length()>0){ |
|||
// String tmallnewkey = keytwo.toLowerCase(); |
|||
// subjectTaskMap.put(tmallnewkey,valueList); |
|||
// } |
|||
// String redis=newkey+"$$"+JsonUtils.toJSONString(valueList); |
|||
//// try { |
|||
//// Constants.getLineQueue().put(redis); |
|||
//// } catch (InterruptedException e) { |
|||
//// e.printStackTrace(); |
|||
//// } |
|||
// |
|||
// // RedisUtil.set(newkey, JsonUtils.toJSONString(valueList), 10); |
|||
// // System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// subjectTaskMap.put(newkey,valueList); |
|||
// //System.out.println(newkey); |
|||
// } |
|||
// // System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// |
|||
// |
|||
// //System.out.println(subjectTaskMap.size()); |
|||
// log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size()); |
|||
// // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 |
|||
// //System.out.println(subjectTaskList.size());// new Date()为获取当前系统时间 |
|||
// //WriteMethod.writeMethod("0621test.txt",JsonUtils.toJSONString(subjectTaskMap)); |
|||
// // System.out.println(JsonUtils.toJSONString(subjectTaskMap)+"当前时间"+ DateUtil.getcurr()); |
|||
// }else { |
|||
// System.out.println("kong a "); |
|||
// } |
|||
// } |
|||
// public static long updatetime = new Date().getTime()/1000; |
|||
|
|||
@Override |
|||
public void run() { |
|||
while (true) try { |
|||
{ |
|||
long updatetime = new Date().getTime()/1000-15; |
|||
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.crawl_start_time,ct.crawl_end_time,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where ct.del=1 and ct.cid!=\"\" and ct.subject_id ='302681' and ct.cid='Npresidentgov' ;"); |
|||
|
|||
if(subjectTaskendList.size()>0){ |
|||
// System.out.println(JsonUtils.toJSONString(subjectTaskendList)); |
|||
for(Map<String, Object> subjectendtTask : subjectTaskendList){ |
|||
String v_subject_id = ""; |
|||
String v_task_id = ""; String appid=""; |
|||
if(null != subjectendtTask.get("subject_id")) { |
|||
v_subject_id = subjectendtTask.get("subject_id").toString(); |
|||
} |
|||
if(null !=subjectendtTask.get("id")){ |
|||
v_task_id = subjectendtTask.get("id").toString(); |
|||
} |
|||
if(null !=subjectendtTask.get("app_id")){ |
|||
appid = subjectendtTask.get("app_id").toString(); |
|||
} |
|||
|
|||
|
|||
if(subjectTaskendList.size()>0){ |
|||
// for(Map<String, Object> subjectTask : subjectTaskList){ |
|||
String key = ""; |
|||
key = subjectendtTask.get("cid") + "#####" + subjectendtTask.get("crawl_data_flag"); |
|||
key= key.toLowerCase(); |
|||
System.out.println(key); |
|||
key= key.toLowerCase(); |
|||
if(disposeCrawldataflag(key)) { |
|||
String getsubjectList = RedisUtil.get(key, 10); |
|||
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList); |
|||
List<Map<String, String>> redistList = new ArrayList<>(); |
|||
for (Map<String, String> redissubjectTask : subjectList) { |
|||
String resubject_id = redissubjectTask.get("subject_id").toString(); |
|||
String reappid = redissubjectTask.get("appid").toString(); |
|||
String taskid = redissubjectTask.get("task_id").toString(); |
|||
if (!resubject_id.equals(v_subject_id) || !reappid.equals(appid) || !v_task_id.equals(taskid)) { |
|||
redistList.add(redissubjectTask); |
|||
} |
|||
} |
|||
String redis = key + "@#@" + JsonUtils.toJSONString(redistList); |
|||
// System.out.println(JsonUtils.toJSONString(redistList)); |
|||
try { |
|||
// System.out.println("写入redis是"+redis); |
|||
Constants.getLineQueue().put(redis); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
} |
|||
|
|||
|
|||
// } |
|||
// if(subjectTaskList.size() > 0){ |
|||
// // System.out.println(subjectTaskendList.size()+"大小啊啊啊啊啊啊啊啊"); |
|||
// String key = ""; |
|||
// for(Map<String, Object> subjectTask : subjectTaskList){ |
|||
// String keytwo = ""; |
|||
// if( subjectTask.get("cid").equals("Tmall")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else if (subjectTask.get("cid").equals("Taobao")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else { |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// Map<String,String> value = new HashMap<>(); |
|||
// List<Map<String,String>> valueList = new ArrayList<>(); |
|||
// String v_subject_id = ""; |
|||
// String v_go_fast_addr = ""; |
|||
// String v_kafka_switch = ""; |
|||
// String v_kafka_addr = ""; |
|||
// String v_task_id = ""; |
|||
// String v_external_id =""; |
|||
// String v_go_fast_switch=""; |
|||
// String v_kafka_topic=""; |
|||
// String v_status=""; |
|||
// String v_del=""; |
|||
// String v_create_user_id=""; |
|||
// String v_ocr="0"; |
|||
// String v_trans="0"; |
|||
// String v_crawl_content_key=""; |
|||
// String appid=""; |
|||
// if(null != subjectTask.get("subject_id")) { |
|||
// v_subject_id = subjectTask.get("subject_id").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("crawl_content_key")) { |
|||
// v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("go_fast_addr")) { |
|||
// v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_addr")) { |
|||
// v_kafka_addr = subjectTask.get("kafka_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_switch")){ |
|||
// v_kafka_switch = subjectTask.get("kafka_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("id")){ |
|||
// v_task_id = subjectTask.get("id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("external_id")){ |
|||
// v_external_id = subjectTask.get("external_id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("go_fast_switch")){ |
|||
// v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("kafka_topic")){ |
|||
// v_kafka_topic = subjectTask.get("kafka_topic").toString(); |
|||
// } |
|||
// // if(null !=subjectTask.get("status")){ |
|||
// // v_status = subjectTask.get("status").toString(); |
|||
// // } |
|||
// if(null !=subjectTask.get("del")){ |
|||
// v_del = subjectTask.get("del").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("create_user_id")){ |
|||
// v_create_user_id = subjectTask.get("create_user_id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("app_id")){ |
|||
// appid = subjectTask.get("app_id").toString(); |
|||
// } |
|||
// |
|||
// |
|||
// value.put("subject_id",v_subject_id); |
|||
// value.put("go_fast_addr",v_go_fast_addr); |
|||
// value.put("export_to_kafka",v_kafka_switch); |
|||
// value.put("kafka_addr",v_kafka_addr); |
|||
// // value.put("task_id",v_task_id); |
|||
// value.put("external_id",v_external_id); |
|||
// value.put("go_fast_switch",v_go_fast_switch); |
|||
// value.put("kafka_topic",v_kafka_topic); |
|||
// // value.put("status",v_status);//专题的状态 |
|||
// value.put("del",v_del);//专题的状态 |
|||
// value.put("appid",subjectTask.get("app_id").toString()); |
|||
// value.put("crawl_content_key",v_crawl_content_key); |
|||
// //System.out.print(v_external_id+"external_id"); |
|||
// String newkey = key.toLowerCase(); |
|||
// String userkey=newkey+"#####"+subjectTask.get("app_id").toString().toLowerCase(); |
|||
// //组装时间的参数 |
|||
// if (subjectTasktimelimiit.containsKey(userkey)){ |
|||
// List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey); |
|||
// if(timelist.size()==1){ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } else{ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// String subject_id=subjectTasktime.get("subject_id").toString(); |
|||
// if (v_subject_id.equals(subject_id)){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } |
|||
// } |
|||
// |
|||
// } |
|||
// // //用户的权限 |
|||
// if (subjectuserlimiit.containsKey(v_create_user_id)){ |
|||
// Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id); |
|||
// v_ocr= permission.get("is_ocr").toString(); |
|||
// v_trans= permission.get("is_trans").toString(); |
|||
// } |
|||
// value.put("is_ocr",v_ocr); |
|||
// value.put("is_trans",v_trans); |
|||
// Set keysSet = new HashSet(); |
|||
// |
|||
// |
|||
// //组装相同任务的任务id |
|||
// if(subjectTaskMap.containsKey(newkey)){ |
|||
// valueList = subjectTaskMap.get(newkey); |
|||
// for (Map<String, String> valuetask : valueList){ |
|||
// String task=valuetask.get("task_id")+","+v_task_id; |
|||
// valuetask.put("task_id",task); |
|||
// value.put("task_id",task); |
|||
// } |
|||
// valueList.add(value); |
|||
// }else{ |
|||
// value.put("task_id",v_task_id); |
|||
// valueList.add(value); |
|||
// } |
|||
//// if(disposeCrawldataflag(newkey)) { //判断redis中的key |
|||
//// String getsubjectList=RedisUtil.get(newkey,10); |
|||
//// try { |
|||
//// List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList); |
|||
//// List<Map<String, String>> redistList=new ArrayList<>(); |
|||
//// for(Map<String, String> redissubjectTask : subjectList){ |
|||
//// String resubject_id=redissubjectTask.get("subject_id").toString(); |
|||
//// String reappid=redissubjectTask.get("appid").toString(); |
|||
//// String maxtime=""; |
|||
//// if(redissubjectTask.containsKey("maxtime")){ |
|||
//// maxtime=redissubjectTask.get("maxtime").toString(); |
|||
//// } |
|||
//// |
|||
//// if(!resubject_id.equals(v_subject_id)&&!reappid.equals(appid)){ |
|||
//// redistList.add(redissubjectTask); |
|||
//// } |
|||
//// String keys=resubject_id+reappid+maxtime; |
|||
//// int beforeSize = keysSet.size(); |
|||
//// keysSet.add(keys); |
|||
//// int afterSize = keysSet.size(); |
|||
//// if(afterSize == beforeSize + 1){ |
|||
//// valueList.add(redissubjectTask); |
|||
//// } |
|||
//// } |
|||
//// valueList.addAll(redistList); |
|||
//// } catch (Exception e) { |
|||
//// log.error("获取redis中的任务失败"+getsubjectList); |
|||
//// } |
|||
//// } |
|||
// if(keytwo.length()>0){ |
|||
// String tmallnewkey = keytwo.toLowerCase(); |
|||
// subjectTaskMap.put(tmallnewkey,valueList); |
|||
// } |
|||
// subjectTaskMap.put(newkey,valueList); |
|||
// //System.out.print(valueList); |
|||
// } |
|||
// //System.out.print(JsonUtils.toJSONString(subjectTaskMap)); |
|||
// |
|||
// Set keysSet = new HashSet(); |
|||
// for (String newkey :subjectTaskMap.keySet()){ |
|||
// List<Map<String, String>> newkeyvalue=subjectTaskMap.get(newkey); |
|||
// //System.out.println(JsonUtils.toJSONString(newkeyvalue)); |
|||
// List<Map<String, String>> setkeyvalue=new ArrayList<>(); |
|||
// for(Map<String, String> redissubjectTask : newkeyvalue){ |
|||
// String resubject_id=redissubjectTask.get("subject_id").toString(); |
|||
// String reappid=redissubjectTask.get("appid").toString(); |
|||
// String maxtime=redissubjectTask.get("maxtime").toString(); |
|||
// String keys=resubject_id+reappid+newkey; |
|||
// int beforeSize = keysSet.size(); |
|||
// keysSet.add(keys); |
|||
// int afterSize = keysSet.size(); |
|||
// if(afterSize == beforeSize + 1){ |
|||
// setkeyvalue.add(redissubjectTask); |
|||
// } |
|||
// } |
|||
// String redis=newkey+"@#@"+JsonUtils.toJSONString(setkeyvalue); |
|||
// try { |
|||
// Constants.getLineQueue().put(redis); |
|||
// } catch (InterruptedException e) { |
|||
// e.printStackTrace(); |
|||
// } |
|||
// } |
|||
// |
|||
// log.info("结束时间"+ DateUtil.getcurr()); |
|||
// //System.out.println(subjectTaskMap.size()); |
|||
// log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size()); |
|||
// }else { |
|||
// String delkey = subjectendtTask.get("cid") + "#####" + subjectendtTask.get("crawl_data_flag"); |
|||
// log.info(delkey.toLowerCase()+"====del"); |
|||
// RedisUtil.del(delkey.toLowerCase(), 10); |
|||
// } |
|||
|
|||
|
|||
|
|||
} |
|||
} |
|||
Thread.sleep(1000*86400); |
|||
} |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
|
|||
private boolean disposeCrawldataflag(String crawldataflag) { |
|||
try{ |
|||
if (RedisUtil.exists(crawldataflag, 10)) { // 先去 redis中查询是否存在,不存直接忽略 |
|||
String value = RedisUtil.get(crawldataflag,10); |
|||
if(null != value && !("").equals(value)) { |
|||
|
|||
return true; |
|||
} |
|||
|
|||
|
|||
} else { |
|||
log.error("[datasave] exec >>> 灌数:该 crwaldataflag 在 Redis 中不存在!!! keys = " + crawldataflag + " ; dbindex = " + 10); |
|||
return false; |
|||
} |
|||
return false; |
|||
}catch (Exception e){ |
|||
e.printStackTrace(); |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
} |
@ -0,0 +1,471 @@ |
|||
package com.bfd.mf.entity.mysql; |
|||
|
|||
|
|||
//import com.bfd.crawler.utils.JsonUtils; |
|||
import com.bfd.crawler.utils.JsonUtils; |
|||
import com.bfd.mf.datasave.tools.Constants; |
|||
import com.bfd.mf.datasave.tools.DBUtil; |
|||
import com.bfd.mf.datasave.tools.DateUtil; |
|||
import crawler.open.util.RedisUtil; |
|||
import org.apache.log4j.Logger; |
|||
|
|||
import java.util.*; |
|||
|
|||
import static com.bfd.mf.entity.mysql.Tasklimit.subjectTasktimelimiit; |
|||
import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit; |
|||
|
|||
public class SubjectTaskdelone implements Runnable { |
|||
private static Logger log = Logger.getLogger(SubjectTaskdelone.class); |
|||
public static Map<String, List<Map<String,String>>> subjectTaskMap = new HashMap<>(); |
|||
|
|||
// public static void loadSubjectTask() { |
|||
// subjectTaskMap.clear(); |
|||
// //List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat").query("select cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id)where (ct.crawl_status=1 or ct.crawl_status=3) and ct.del=0 ;");ct.app_id=cs.app_id and |
|||
// String time=DateUtil.getDate(); |
|||
// //System.out.println(time); |
|||
// //System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.app_id=cs.app_id and ct.cid!=\"\" and ct.update_time>'"+time+"'order by ct.update_time desc;"); |
|||
// System.out.println(subjectTaskList.size()); |
|||
// if(subjectTaskList.size() > 0){ |
|||
// String key = ""; |
|||
// for(Map<String, Object> subjectTask : subjectTaskList){ //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} |
|||
// String keytwo = ""; |
|||
// if( subjectTask.get("cid").equals("Tmall")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else if (subjectTask.get("cid").equals("Taobao")){ |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// else { |
|||
// key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
// } |
|||
// Map<String,String> value = new HashMap<>(); |
|||
// List<Map<String,String>> valueList = new ArrayList<>(); |
|||
// String v_subject_id = ""; |
|||
// String v_go_fast_addr = ""; |
|||
// String v_kafka_switch = ""; |
|||
// String v_kafka_addr = ""; |
|||
// String v_task_id = ""; |
|||
// String v_external_id =""; |
|||
// String v_go_fast_switch=""; |
|||
// String v_kafka_topic=""; |
|||
// String v_status=""; |
|||
// String v_del=""; |
|||
// String v_create_user_id=""; |
|||
// String v_ocr="0"; |
|||
// String v_trans="0"; |
|||
// String v_crawl_content_key=""; |
|||
// if(null != subjectTask.get("subject_id")) { |
|||
// v_subject_id = subjectTask.get("subject_id").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("crawl_content_key")) { |
|||
// v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("go_fast_addr")) { |
|||
// v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_addr")) { |
|||
// v_kafka_addr = subjectTask.get("kafka_addr").toString(); |
|||
// } |
|||
// if(null != subjectTask.get("kafka_switch")){ |
|||
// v_kafka_switch = subjectTask.get("kafka_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("id")){ |
|||
// v_task_id = subjectTask.get("id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("external_id")){ |
|||
// v_external_id = subjectTask.get("external_id").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("go_fast_switch")){ |
|||
// v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("kafka_topic")){ |
|||
// v_kafka_topic = subjectTask.get("kafka_topic").toString(); |
|||
// } |
|||
//// if(null !=subjectTask.get("status")){ |
|||
//// v_status = subjectTask.get("status").toString(); |
|||
//// } |
|||
// if(null !=subjectTask.get("del")){ |
|||
// v_del = subjectTask.get("del").toString(); |
|||
// } |
|||
// if(null !=subjectTask.get("create_user_id")){ |
|||
// v_create_user_id = subjectTask.get("create_user_id").toString(); |
|||
// } |
|||
// value.put("subject_id",v_subject_id); |
|||
// value.put("go_fast_addr",v_go_fast_addr); |
|||
// value.put("export_to_kafka",v_kafka_switch); |
|||
// value.put("kafka_addr",v_kafka_addr); |
|||
// // value.put("task_id",v_task_id); |
|||
// value.put("external_id",v_external_id); |
|||
// value.put("go_fast_switch",v_go_fast_switch); |
|||
// value.put("kafka_topic",v_kafka_topic); |
|||
// // value.put("status",v_status);//专题的状态 |
|||
// value.put("del",v_del);//专题的状态 |
|||
// value.put("appid",subjectTask.get("app_id").toString()); |
|||
// value.put("crawl_content_key",v_crawl_content_key); |
|||
// //System.out.print(v_external_id+"external_id"); |
|||
// String newkey = key.toLowerCase(); |
|||
// String userkey=newkey+"#####"+subjectTask.get("app_id").toString().toLowerCase(); |
|||
// |
|||
// //组装时间的参数 |
|||
// if (subjectTasktimelimiit.containsKey(userkey)){ |
|||
// List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey); |
|||
// if(timelist.size()==1){ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } else{ |
|||
// for(Map<String, String> subjectTasktime : timelist){ |
|||
// String subject_id=subjectTasktime.get("subject_id").toString(); |
|||
// if (v_subject_id.equals(subject_id)){ |
|||
// value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
// value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
// } |
|||
// } |
|||
// } |
|||
// |
|||
// } |
|||
//// //用户的权限 |
|||
// if (subjectuserlimiit.containsKey(v_create_user_id)){ |
|||
// Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id); |
|||
// v_ocr= permission.get("is_ocr").toString(); |
|||
// v_trans= permission.get("is_trans").toString(); |
|||
// } |
|||
// value.put("is_ocr",v_ocr); |
|||
// value.put("is_trans",v_trans); |
|||
// //组装相同任务的任务id |
|||
// if(subjectTaskMap.containsKey(newkey)){ |
|||
// valueList = subjectTaskMap.get(newkey); |
|||
// for (Map<String, String> valuetask : valueList){ |
|||
// String task=valuetask.get("task_id")+","+v_task_id; |
|||
// valuetask.put("task_id",task); |
|||
// value.put("task_id",task); |
|||
// } |
|||
// valueList.add(value); |
|||
// }else{ |
|||
// value.put("task_id",v_task_id); |
|||
// valueList.add(value); |
|||
// } |
|||
// |
|||
// if(keytwo.length()>0){ |
|||
// String tmallnewkey = keytwo.toLowerCase(); |
|||
// subjectTaskMap.put(tmallnewkey,valueList); |
|||
// } |
|||
// String redis=newkey+"$$"+JsonUtils.toJSONString(valueList); |
|||
//// try { |
|||
//// Constants.getLineQueue().put(redis); |
|||
//// } catch (InterruptedException e) { |
|||
//// e.printStackTrace(); |
|||
//// } |
|||
// |
|||
// // RedisUtil.set(newkey, JsonUtils.toJSONString(valueList), 10); |
|||
// // System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// subjectTaskMap.put(newkey,valueList); |
|||
// //System.out.println(newkey); |
|||
// } |
|||
// // System.out.println("结束时间"+ DateUtil.getcurr()); |
|||
// |
|||
// |
|||
// //System.out.println(subjectTaskMap.size()); |
|||
// log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size()); |
|||
// // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 |
|||
// //System.out.println(subjectTaskList.size());// new Date()为获取当前系统时间 |
|||
// //WriteMethod.writeMethod("0621test.txt",JsonUtils.toJSONString(subjectTaskMap)); |
|||
// // System.out.println(JsonUtils.toJSONString(subjectTaskMap)+"当前时间"+ DateUtil.getcurr()); |
|||
// }else { |
|||
// System.out.println("kong a "); |
|||
// } |
|||
// } |
|||
// public static long updatetime = new Date().getTime()/1000; |
|||
|
|||
@Override |
|||
public void run() { |
|||
while (true) try { |
|||
{ |
|||
|
|||
Userlimit.loaduser(); |
|||
Tasklimit.loadTask(); |
|||
// long updatetime = new Date().getTime()/1000-15; |
|||
// SimpleDateFormat datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|||
// String now=datetime.format(Calendar.getInstance().getTime()); |
|||
// System.out.println(updatetime+now); |
|||
// long starttime=new Date().getTime()/1000-20L; |
|||
// if (disposeCrawldataflag("Tasktimelimiit")){ |
|||
// starttime= Long.parseLong(RedisUtil.get("Tasktimelimiit",10)); |
|||
// starttime=starttime-5; |
|||
// RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 10); |
|||
// }else { |
|||
// System.out.println("第一次写入"); |
|||
// RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 10); |
|||
// } |
|||
// System.out.println(System.currentTimeMillis()+"开始mysql执行的时间"); |
|||
// String sql ="select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and unix_timestamp(ct.start_time)>'"+starttime+"';"; |
|||
//subject_id=302896 and del=1 and crawl_status =1 |
|||
List<Map<String, Object>> delsubjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=0 ) and ct.del=1 and ct.cid!=\"\" and ct.subject_id=302896 and ct.del=1 and ct.crawl_status =0;"); |
|||
for(Map<String, Object> delsubjectTask : delsubjectTaskendList){ |
|||
String cid= (String) delsubjectTask.get("cid"); |
|||
String crawl_data_flag= (String) delsubjectTask.get("crawl_data_flag"); |
|||
// System.out.println("sq; si "+sql); |
|||
// List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and unix_timestamp(ct.start_time)>'"+starttime+"';"); |
|||
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1) and ct.del=0 and ct.cid='"+cid+"' and ct.subject_id!=302896 and ct.crawl_data_flag='"+crawl_data_flag+"';"); |
|||
System.out.println(System.currentTimeMillis()+"执行mysql任务结束的时间"); |
|||
subjectTaskMap.clear(); |
|||
if(subjectTaskendList.size()>0){ |
|||
String delkey = cid + "#####" + crawl_data_flag; |
|||
log.info(delkey.toLowerCase()+"====del"); |
|||
RedisUtil.del(delkey.toLowerCase(), 10); |
|||
Thread.sleep(1000*5); |
|||
|
|||
System.out.println(subjectTaskendList.size()+"任务的大小"); |
|||
for(Map<String, Object> subjectTask : subjectTaskendList){ |
|||
String key = ""; |
|||
String keytwo = ""; |
|||
if( subjectTask.get("cid").equals("Tmall")){ |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
} |
|||
else if (subjectTask.get("cid").equals("Taobao")){ |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); |
|||
} |
|||
else { |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
} |
|||
Map<String,String> value = new HashMap<>(); |
|||
List<Map<String,String>> valueList = new ArrayList<>(); |
|||
String v_subject_id = ""; |
|||
String v_go_fast_addr = ""; |
|||
String v_kafka_switch = ""; |
|||
String v_kafka_addr = ""; |
|||
String v_task_id = ""; |
|||
String v_external_id =""; |
|||
String v_go_fast_switch=""; |
|||
String v_kafka_topic=""; |
|||
String v_status=""; |
|||
String v_del=""; |
|||
String v_create_user_id=""; |
|||
String v_ocr="0"; |
|||
String v_trans="0"; |
|||
String v_crawl_content_key=""; |
|||
String appid=""; |
|||
if(null != subjectTask.get("subject_id")) { |
|||
v_subject_id = subjectTask.get("subject_id").toString(); |
|||
} |
|||
if(null != subjectTask.get("crawl_content_key")) { |
|||
v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); |
|||
} |
|||
if(null != subjectTask.get("go_fast_addr")) { |
|||
v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); |
|||
} |
|||
if(null != subjectTask.get("kafka_addr")) { |
|||
v_kafka_addr = subjectTask.get("kafka_addr").toString(); |
|||
} |
|||
if(null != subjectTask.get("kafka_switch")){ |
|||
v_kafka_switch = subjectTask.get("kafka_switch").toString(); |
|||
} |
|||
if(null !=subjectTask.get("id")){ |
|||
v_task_id = subjectTask.get("id").toString(); |
|||
} |
|||
if(null !=subjectTask.get("external_id")){ |
|||
v_external_id = subjectTask.get("external_id").toString(); |
|||
} |
|||
if(null !=subjectTask.get("go_fast_switch")){ |
|||
v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); |
|||
} |
|||
if(null !=subjectTask.get("kafka_topic")){ |
|||
v_kafka_topic = subjectTask.get("kafka_topic").toString(); |
|||
} |
|||
if(null !=subjectTask.get("del")){ |
|||
v_del = subjectTask.get("del").toString(); |
|||
} |
|||
if(null !=subjectTask.get("create_user_id")){ |
|||
v_create_user_id = subjectTask.get("create_user_id").toString(); |
|||
} |
|||
if(null !=subjectTask.get("app_id")){ |
|||
appid = subjectTask.get("app_id").toString(); |
|||
} |
|||
|
|||
value.put("appid", appid); |
|||
value.put("subject_id",v_subject_id); |
|||
value.put("go_fast_addr",v_go_fast_addr); |
|||
value.put("export_to_kafka",v_kafka_switch); |
|||
value.put("kafka_addr",v_kafka_addr); |
|||
// value.put("task_id",v_task_id); |
|||
value.put("external_id",v_external_id); |
|||
value.put("go_fast_switch",v_go_fast_switch); |
|||
value.put("kafka_topic",v_kafka_topic); |
|||
value.put("del",v_del);//专题的状态 |
|||
value.put("crawl_content_key",v_crawl_content_key); |
|||
String newkey = key.toLowerCase(); |
|||
String userkey=newkey; |
|||
//组装时间的参数 |
|||
try { |
|||
if (!subjectTasktimelimiit.containsKey(userkey)) {//如果无最大和最小时间 |
|||
Tasklimit.loadTask(); |
|||
} |
|||
List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey); |
|||
if(timelist.size()==1){ |
|||
for(Map<String, String> subjectTasktime : timelist){ |
|||
value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
} |
|||
} else{ |
|||
for(Map<String, String> subjectTasktime : timelist){ |
|||
String subject_id=subjectTasktime.get("subject_id").toString(); |
|||
if (v_subject_id.equals(subject_id)){ |
|||
value.put("maxtime",subjectTasktime.get("max_time").toString()); |
|||
value.put("mintime",subjectTasktime.get("min_time").toString()); |
|||
} |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
System.out.print("获取时间失败了"+subjectTask.get("crawl_data_flag")); |
|||
value.put("maxtime", String.valueOf(System.currentTimeMillis())); |
|||
value.put("mintime","0"); |
|||
e.printStackTrace(); |
|||
} |
|||
|
|||
// |
|||
// //用户的权限 |
|||
if (subjectuserlimiit.containsKey(v_create_user_id)){ |
|||
Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id); |
|||
v_ocr= permission.get("is_ocr").toString(); |
|||
v_trans= permission.get("is_trans").toString(); |
|||
} |
|||
value.put("is_ocr",v_ocr); |
|||
value.put("is_trans",v_trans); |
|||
Set keysSet = new HashSet(); |
|||
//组装相同任务的任务id |
|||
if(subjectTaskMap.containsKey(newkey)){ |
|||
valueList = subjectTaskMap.get(newkey); |
|||
for (Map<String, String> valuetask : valueList){ |
|||
String task=valuetask.get("task_id")+","+v_task_id; |
|||
valuetask.put("task_id",task); |
|||
value.put("task_id",task); |
|||
} |
|||
valueList.add(value); |
|||
}else{ |
|||
value.put("task_id",v_task_id); |
|||
valueList.add(value); |
|||
} |
|||
if(disposeCrawldataflag(newkey)) { //判断redis中的key |
|||
String getsubjectList=RedisUtil.get(newkey,10); |
|||
try { |
|||
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList); |
|||
List<Map<String, String>> redistList=new ArrayList<>(); |
|||
for(Map<String, String> redissubjectTask : subjectList){ |
|||
String resubject_id=redissubjectTask.get("subject_id").toString(); |
|||
// String reappid=redissubjectTask.get("appid").toString(); |
|||
String maxtime=""; |
|||
if(redissubjectTask.containsKey("maxtime")){ |
|||
maxtime=redissubjectTask.get("maxtime").toString(); |
|||
} |
|||
if(!resubject_id.equals(v_subject_id)){ |
|||
redistList.add(redissubjectTask); |
|||
} |
|||
String keys=resubject_id+maxtime; |
|||
int beforeSize = keysSet.size(); |
|||
keysSet.add(keys); |
|||
int afterSize = keysSet.size(); |
|||
if(afterSize == beforeSize + 1){ |
|||
valueList.add(redissubjectTask); |
|||
} |
|||
} |
|||
valueList.addAll(redistList); |
|||
} catch (Exception e) { |
|||
log.error("获取redis中的任务失败"+getsubjectList); |
|||
} |
|||
} |
|||
if(keytwo.length()>0){ |
|||
String tmallnewkey = keytwo.toLowerCase(); |
|||
subjectTaskMap.put(tmallnewkey,valueList); |
|||
} |
|||
subjectTaskMap.put(newkey,valueList); |
|||
//System.out.print(valueList); |
|||
|
|||
//System.out.print(JsonUtils.toJSONString(subjectTaskMap)); |
|||
|
|||
Set keysSets = new HashSet(); |
|||
for (String newkeya :subjectTaskMap.keySet()){ |
|||
List<Map<String, String>> newkeyvalue=subjectTaskMap.get(newkeya); |
|||
List<Map<String, String>> setkeyvalue=new ArrayList<>(); |
|||
for(Map<String, String> redissubjectTask : newkeyvalue){ |
|||
String resubject_id=redissubjectTask.get("subject_id").toString(); |
|||
String reappid=redissubjectTask.get("appid").toString(); |
|||
String keys=resubject_id+reappid+newkeya; |
|||
int beforeSize = keysSets.size(); |
|||
keysSets.add(keys); |
|||
int afterSize = keysSets.size(); |
|||
if(afterSize == beforeSize + 1){ |
|||
setkeyvalue.add(redissubjectTask); |
|||
} |
|||
} |
|||
String redis=newkeya+"@#@"+JsonUtils.toJSONString(setkeyvalue); |
|||
System.out.println(redis); |
|||
try { |
|||
Constants.getLineQueue().put(redis); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
System.out.println(System.currentTimeMillis()+"任务组装结束的时间"); |
|||
log.info("结束时间"+ DateUtil.getcurr()); |
|||
log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size()); |
|||
// }else { |
|||
// String delkey = subjectendtTask.get("cid") + "#####" + subjectendtTask.get("crawl_data_flag"); |
|||
// log.info(delkey.toLowerCase()+"====del"); |
|||
// RedisUtil.del(delkey.toLowerCase(), 10); |
|||
// } |
|||
} |
|||
}else { |
|||
|
|||
String delkey = cid + "#####" + crawl_data_flag; |
|||
log.info(delkey.toLowerCase()+"====del"); |
|||
RedisUtil.del(delkey.toLowerCase(), 10); |
|||
Thread.sleep(1000*5); |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
Thread.sleep(1000*60*60); |
|||
} |
|||
|
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
|
|||
private boolean disposeCrawldataflag(String crawldataflag) { |
|||
try{ |
|||
if (RedisUtil.exists(crawldataflag, 10)) { // 先去 redis中查询是否存在,不存直接忽略 |
|||
String value = RedisUtil.get(crawldataflag,10); |
|||
if(null != value && !("").equals(value)) { |
|||
|
|||
return true; |
|||
} |
|||
|
|||
|
|||
} else { |
|||
log.error("[datasave] exec >>> 灌数:该 crwaldataflag 在 Redis 中不存在!!! keys = " + crawldataflag + " ; dbindex = " + 10); |
|||
return false; |
|||
} |
|||
return false; |
|||
}catch (Exception e){ |
|||
e.printStackTrace(); |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
public static void main(String[] args) { |
|||
|
|||
long updatetime = new Date().getTime()/1000-30; |
|||
System.out.println(updatetime); |
|||
System.out.println(new Date().getTime()/1000); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,36 @@ |
|||
package com.bfd.mf.entity.mysql; |
|||
|
|||
import com.bfd.mf.datasave.tools.DBUtil; |
|||
|
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
public class si implements Runnable{ |
|||
@Override |
|||
public void run() { |
|||
System.out.println("aaa"); |
|||
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("SELECT create_user,id,external_id from intelligent_crawl.cl_task ct WHERE (create_user like '%SocialInsightPro%' or create_user like '%SocialInsightTe%' or create_user like '%si-osint%') and crawl_status =1 and del =0 and external_id like '%t_%'"); |
|||
|
|||
System.out.println((subjectTaskendList.size())); |
|||
if(subjectTaskendList.size()>0){ |
|||
for(Map<String, Object> subjectendtTask : subjectTaskendList){ |
|||
String external_id= (String) subjectendtTask.get("external_id"); |
|||
String create_user= (String) subjectendtTask.get("create_user"); |
|||
|
|||
long idin= (long) subjectendtTask.get("id"); |
|||
int id= Integer.parseInt(external_id.split("t_")[1]); |
|||
|
|||
|
|||
String sql="update all_task.cl_task set external_id='"+external_id+"',id='"+id+"' where create_user='"+create_user+"'and id='"+idin+"';"; |
|||
System.out.println(sql); |
|||
|
|||
|
|||
} |
|||
|
|||
|
|||
|
|||
|
|||
} |
|||
|
|||
} |
|||
} |
@ -0,0 +1,172 @@ |
|||
package com.bfd.mf.entity.mysql; |
|||
|
|||
import com.bfd.crawler.utils.JsonUtils; |
|||
import com.bfd.mf.datasave.tools.Constants; |
|||
import com.bfd.mf.datasave.tools.DBUtil; |
|||
import com.bfd.mf.datasave.tools.DateUtil; |
|||
import crawler.open.util.RedisUtil; |
|||
import org.apache.log4j.Logger; |
|||
|
|||
import java.util.*; |
|||
|
|||
import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit; |
|||
|
|||
public class taskdel implements Runnable{ |
|||
private static Logger log = Logger.getLogger(taskdel.class); |
|||
|
|||
@Override |
|||
public void run() { |
|||
while (true) { |
|||
System.out.println("=========="); |
|||
long updatetime = new Date().getTime()/1000-15; |
|||
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.crawl_start_time,ct.crawl_end_time,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where ct.del=1 and ct.cid!=\"\" and ct.cid!='MUZX2WNM84L3ARXNSYN3' and unix_timestamp(ct.update_time)>'"+updatetime+"' ;"); |
|||
if(subjectTaskendList.size()>0) { |
|||
System.out.println(subjectTaskendList); |
|||
System.out.println(subjectTaskendList.size() + "任务的大小"); |
|||
for (Map<String, Object> subjectTask : subjectTaskendList) { |
|||
String key = ""; |
|||
String keytwo = ""; |
|||
if (subjectTask.get("cid").equals("Tmall")) { |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
keytwo = "Taobao" + "#####" + subjectTask.get("crawl_data_flag"); |
|||
} else if (subjectTask.get("cid").equals("Taobao")) { |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
keytwo = "Tmall" + "#####" + subjectTask.get("crawl_data_flag"); |
|||
} else { |
|||
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); |
|||
} |
|||
Map<String, String> value = new HashMap<>(); |
|||
List<Map<String, String>> valueList = new ArrayList<>(); |
|||
String v_subject_id = ""; |
|||
String v_go_fast_addr = ""; |
|||
String v_kafka_switch = ""; |
|||
String v_kafka_addr = ""; |
|||
String v_task_id = ""; |
|||
String v_external_id = ""; |
|||
String v_go_fast_switch = ""; |
|||
String v_kafka_topic = ""; |
|||
String v_status = ""; |
|||
String v_del = ""; |
|||
String v_create_user_id = ""; |
|||
String v_ocr = "0"; |
|||
String v_trans = "0"; |
|||
String v_crawl_content_key = ""; |
|||
String appid = ""; |
|||
if (null != subjectTask.get("subject_id")) { |
|||
v_subject_id = subjectTask.get("subject_id").toString(); |
|||
} |
|||
if (null != subjectTask.get("crawl_content_key")) { |
|||
v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); |
|||
} |
|||
if (null != subjectTask.get("go_fast_addr")) { |
|||
v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); |
|||
} |
|||
if (null != subjectTask.get("kafka_addr")) { |
|||
v_kafka_addr = subjectTask.get("kafka_addr").toString(); |
|||
} |
|||
if (null != subjectTask.get("kafka_switch")) { |
|||
v_kafka_switch = subjectTask.get("kafka_switch").toString(); |
|||
} |
|||
if (null != subjectTask.get("id")) { |
|||
v_task_id = subjectTask.get("id").toString(); |
|||
} |
|||
if (null != subjectTask.get("external_id")) { |
|||
v_external_id = subjectTask.get("external_id").toString(); |
|||
} |
|||
if (null != subjectTask.get("go_fast_switch")) { |
|||
v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); |
|||
} |
|||
if (null != subjectTask.get("kafka_topic")) { |
|||
v_kafka_topic = subjectTask.get("kafka_topic").toString(); |
|||
} |
|||
if (null != subjectTask.get("del")) { |
|||
v_del = subjectTask.get("del").toString(); |
|||
} |
|||
if (null != subjectTask.get("create_user_id")) { |
|||
v_create_user_id = subjectTask.get("create_user_id").toString(); |
|||
} |
|||
if (null != subjectTask.get("app_id")) { |
|||
appid = subjectTask.get("app_id").toString(); |
|||
} |
|||
|
|||
value.put("appid", appid); |
|||
value.put("subject_id", v_subject_id); |
|||
value.put("go_fast_addr", v_go_fast_addr); |
|||
value.put("export_to_kafka", v_kafka_switch); |
|||
value.put("kafka_addr", v_kafka_addr); |
|||
// value.put("task_id",v_task_id); |
|||
value.put("external_id", v_external_id); |
|||
value.put("go_fast_switch", v_go_fast_switch); |
|||
value.put("kafka_topic", v_kafka_topic); |
|||
value.put("del", v_del);//专题的状态 |
|||
value.put("crawl_content_key", v_crawl_content_key); |
|||
value.put("maxtime", subjectTask.get("crawl_end_time").toString()); |
|||
value.put("mintime", subjectTask.get("crawl_start_time").toString()); |
|||
String newkey = key.toLowerCase(); |
|||
if (disposeCrawldataflag(newkey)) { //判断redis中的key |
|||
String getsubjectList = RedisUtil.get(newkey, 10); |
|||
try { |
|||
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList); |
|||
List<Map<String, String>> redistList = new ArrayList<>(); |
|||
for (Map<String, String> redissubjectTask : subjectList) { |
|||
String resubject_id = redissubjectTask.get("subject_id").toString(); |
|||
String reappid = redissubjectTask.get("appid").toString(); |
|||
String taskid = redissubjectTask.get("task_id").toString(); |
|||
appid="SI_1"; |
|||
if (!resubject_id.equals(v_subject_id) || !reappid.equals(appid) || !v_task_id.equals(taskid)) { |
|||
redistList.add(redissubjectTask); |
|||
} |
|||
} |
|||
String redis = newkey + "@#@" + JsonUtils.toJSONString(redistList); |
|||
if(redistList.size()>0){ |
|||
try { |
|||
System.out.println(newkey+"写入redis是" + JsonUtils.toJSONString(redistList)); |
|||
RedisUtil.set(newkey, JsonUtils.toJSONString(redistList), 10); |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
}else { |
|||
System.out.println("删除redis是" +newkey); |
|||
RedisUtil.del(newkey, 10); |
|||
} |
|||
|
|||
|
|||
} catch (Exception e) { |
|||
log.error("获取redis中的任务失败" + getsubjectList); |
|||
} |
|||
} |
|||
|
|||
System.out.println(System.currentTimeMillis() + "任务组装结束的时间"); |
|||
log.info("结束时间" + DateUtil.getcurr()); |
|||
log.info("当天任务的数量" + key + " ; data = " + subjectTaskendList.size()); |
|||
} |
|||
} |
|||
try { |
|||
Thread.sleep(1000*5); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
private boolean disposeCrawldataflag(String crawldataflag) { |
|||
try{ |
|||
if (RedisUtil.exists(crawldataflag, 10)) { // 先去 redis中查询是否存在,不存直接忽略 |
|||
String value = RedisUtil.get(crawldataflag,10); |
|||
if(null != value && !("").equals(value)) { |
|||
return true; |
|||
} |
|||
} else { |
|||
log.error("[datasave] exec >>> 灌数:该 crwaldataflag 在 Redis 中不存在!!! keys = " + crawldataflag + " ; dbindex = " + 10); |
|||
return false; |
|||
} |
|||
return false; |
|||
}catch (Exception e){ |
|||
e.printStackTrace(); |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.bfd.mf.runstart; |
|||
|
|||
import com.bfd.mf.datasave.tools.DBUtil; |
|||
import com.bfd.mf.entity.mysql.SubjectTask; |
|||
import com.bfd.mf.entity.mysql.taskdel; |
|||
import crawler.open.util.RedisUtil; |
|||
import org.apache.log4j.PropertyConfigurator; |
|||
|
|||
|
|||
public class DelRunStartDataSave { |
|||
|
|||
private static String log4jPath = "../etc/log4j.properties"; |
|||
private static String dbPath = "../etc/db.properties"; |
|||
private static String redisPath = "../etc/145redis.properties"; |
|||
static { |
|||
PropertyConfigurator.configureAndWatch(log4jPath); |
|||
DBUtil.init(dbPath); |
|||
RedisUtil.init(redisPath); |
|||
} |
|||
public static void main(String[] args) { |
|||
System.out.println("asdasdad"); |
|||
for (int i = 0; i < 1; i ++) { |
|||
taskdel taskdel = new taskdel(); |
|||
Thread del = new Thread(taskdel, "taskdel" + i); |
|||
del.start(); |
|||
} |
|||
|
|||
|
|||
|
|||
} |
|||
} |
@ -0,0 +1,36 @@ |
|||
Manifest-Version: 1.0 |
|||
Main-Class: com.bfd.mf.runstart.DelRunStartDataSave |
|||
Class-Path: junit-4.13-beta-1.jar zkclient-0.10.jar lucene-core-7.2.1.ja |
|||
r netty-codec-4.1.16.Final.jar protobuf-java-3.19.4.jar lucene-spatial- |
|||
extras-7.2.1.jar utils-3.0.0.jar fastjson-1.1.22.jar mysql-connector-ja |
|||
va-8.0.29.jar netty-resolver-4.1.16.Final.jar lucene-spatial3d-7.2.1.ja |
|||
r commons-codec-1.10.jar elasticsearch-core-6.2.3.jar lz4-1.3.0.jar com |
|||
mons-lang3-3.1.jar lucene-join-7.2.1.jar reindex-client-6.2.3.jar snake |
|||
yaml-1.17.jar netty-codec-http-4.1.16.Final.jar hamcrest-core-1.3.jar l |
|||
ucene-misc-7.2.1.jar guava-14.0.1.jar lucene-suggest-7.2.1.jar hppc-0.7 |
|||
.1.jar lucene-highlighter-7.2.1.jar httpasyncclient-4.1.2.jar netty-com |
|||
mon-4.1.16.Final.jar transport-6.2.3.jar log4j-api-2.9.1.jar HdrHistogr |
|||
am-2.1.9.jar kafka-0.10.jar compiler-0.9.3.jar lucene-memory-7.2.1.jar |
|||
elasticsearch-cli-6.2.3.jar aggs-matrix-stats-client-6.2.3.jar lucene-a |
|||
nalyzers-common-7.2.1.jar lucene-sandbox-7.2.1.jar jackson-dataformat-c |
|||
bor-2.8.10.jar elastiUtils-0.0.1-SNAPSHOT.jar httpcore-nio-4.4.5.jar co |
|||
mmons-logging-1.1.3.jar BfdRedisTools-2.0-1.0.0.jar slf4j-api-1.7.22.ja |
|||
r httpcore-4.4.5.jar snappy-java-1.1.2.6.jar jts-1.13.jar commons-pool2 |
|||
-2.0.jar jodis-0.1.2.jar netty-handler-4.1.16.Final.jar metrics-core-2. |
|||
2.0.jar curator-client-2.7.0.jar transport-netty4-client-6.2.3.jar elas |
|||
ticsearch-6.2.3.jar httpclient-4.5.2.jar lang-mustache-client-6.2.3.jar |
|||
lucene-queryparser-7.2.1.jar jackson-dataformat-smile-2.8.10.jar kafka |
|||
_2.10-0.10.2.0.jar commons-lang-2.4.jar rank-eval-client-6.2.3.jar elas |
|||
ticsearch-rest-client-6.2.3.jar jackson-core-2.8.1.jar scala-library-2. |
|||
10.6.jar t-digest-3.0.jar spatial4j-0.6.jar securesm-1.2.jar curator-re |
|||
cipes-2.7.0.jar jedis-2.6.0.jar lucene-queries-7.2.1.jar curator-framew |
|||
ork-2.7.0.jar lucene-backward-codecs-7.2.1.jar parent-join-client-6.2.3 |
|||
.jar jackson-dataformat-yaml-2.8.10.jar okio-1.11.0.jar netty-transport |
|||
-4.1.16.Final.jar jopt-simple-5.0.3.jar jackson-annotations-2.8.1.jar j |
|||
oda-time-2.9.9.jar zookeeper-3.4.6.jar okhttp-3.6.0.jar netty-buffer-4. |
|||
1.16.Final.jar log4j-core-2.9.1.jar slf4j-log4j12-1.7.21.jar jackson-da |
|||
tabind-2.8.1.jar lucene-spatial-7.2.1.jar percolator-client-6.2.3.jar n |
|||
etty-3.7.0.Final.jar log4j-1.2.17.jar lucene-grouping-7.2.1.jar elastic |
|||
search-rest-high-level-client-6.2.3.jar kafka-clients-0.10.2.0.jar jlin |
|||
e-0.9.94.jar jna-4.5.1.jar |
|||
|
@ -0,0 +1,36 @@ |
|||
Manifest-Version: 1.0 |
|||
Main-Class: com.bfd.mf.runstart.DelRunStartDataSave |
|||
Class-Path: junit-4.13-beta-1.jar zkclient-0.10.jar lucene-core-7.2.1.ja |
|||
r netty-codec-4.1.16.Final.jar protobuf-java-3.19.4.jar lucene-spatial- |
|||
extras-7.2.1.jar utils-3.0.0.jar fastjson-1.1.22.jar mysql-connector-ja |
|||
va-8.0.29.jar netty-resolver-4.1.16.Final.jar lucene-spatial3d-7.2.1.ja |
|||
r commons-codec-1.10.jar elasticsearch-core-6.2.3.jar lz4-1.3.0.jar com |
|||
mons-lang3-3.1.jar lucene-join-7.2.1.jar reindex-client-6.2.3.jar snake |
|||
yaml-1.17.jar netty-codec-http-4.1.16.Final.jar hamcrest-core-1.3.jar l |
|||
ucene-misc-7.2.1.jar guava-14.0.1.jar lucene-suggest-7.2.1.jar hppc-0.7 |
|||
.1.jar lucene-highlighter-7.2.1.jar httpasyncclient-4.1.2.jar netty-com |
|||
mon-4.1.16.Final.jar transport-6.2.3.jar log4j-api-2.9.1.jar HdrHistogr |
|||
am-2.1.9.jar kafka-0.10.jar compiler-0.9.3.jar lucene-memory-7.2.1.jar |
|||
elasticsearch-cli-6.2.3.jar aggs-matrix-stats-client-6.2.3.jar lucene-a |
|||
nalyzers-common-7.2.1.jar lucene-sandbox-7.2.1.jar jackson-dataformat-c |
|||
bor-2.8.10.jar elastiUtils-0.0.1-SNAPSHOT.jar httpcore-nio-4.4.5.jar co |
|||
mmons-logging-1.1.3.jar BfdRedisTools-2.0-1.0.0.jar slf4j-api-1.7.22.ja |
|||
r httpcore-4.4.5.jar snappy-java-1.1.2.6.jar jts-1.13.jar commons-pool2 |
|||
-2.0.jar jodis-0.1.2.jar netty-handler-4.1.16.Final.jar metrics-core-2. |
|||
2.0.jar curator-client-2.7.0.jar transport-netty4-client-6.2.3.jar elas |
|||
ticsearch-6.2.3.jar httpclient-4.5.2.jar lang-mustache-client-6.2.3.jar |
|||
lucene-queryparser-7.2.1.jar jackson-dataformat-smile-2.8.10.jar kafka |
|||
_2.10-0.10.2.0.jar commons-lang-2.4.jar rank-eval-client-6.2.3.jar elas |
|||
ticsearch-rest-client-6.2.3.jar jackson-core-2.8.1.jar scala-library-2. |
|||
10.6.jar t-digest-3.0.jar spatial4j-0.6.jar securesm-1.2.jar curator-re |
|||
cipes-2.7.0.jar jedis-2.6.0.jar lucene-queries-7.2.1.jar curator-framew |
|||
ork-2.7.0.jar lucene-backward-codecs-7.2.1.jar parent-join-client-6.2.3 |
|||
.jar jackson-dataformat-yaml-2.8.10.jar okio-1.11.0.jar netty-transport |
|||
-4.1.16.Final.jar jopt-simple-5.0.3.jar jackson-annotations-2.8.1.jar j |
|||
oda-time-2.9.9.jar zookeeper-3.4.6.jar okhttp-3.6.0.jar netty-buffer-4. |
|||
1.16.Final.jar log4j-core-2.9.1.jar slf4j-log4j12-1.7.21.jar jackson-da |
|||
tabind-2.8.1.jar lucene-spatial-7.2.1.jar percolator-client-6.2.3.jar n |
|||
etty-3.7.0.Final.jar log4j-1.2.17.jar lucene-grouping-7.2.1.jar elastic |
|||
search-rest-high-level-client-6.2.3.jar kafka-clients-0.10.2.0.jar jlin |
|||
e-0.9.94.jar jna-4.5.1.jar |
|||
|
@ -0,0 +1,5 @@ |
|||
#Generated by Maven |
|||
#Mon Oct 31 11:59:19 CST 2022 |
|||
version=2.0-SNAPSHOT |
|||
groupId=com.bfd.mf |
|||
artifactId=cl_stream_datasave |
@ -0,0 +1,36 @@ |
|||
com/bfd/mf/entity/FieldInfo.class |
|||
com/bfd/mf/datasave/tools/DBConnectionManager.class |
|||
com/bfd/mf/datasave/tools/Constants.class |
|||
com/bfd/mf/entity/FieldNormaliz.class |
|||
com/bfd/mf/runstart/RunStartDataSave.class |
|||
com/bfd/mf/datasave/tools/DataProcess.class |
|||
com/bfd/mf/datasave/kafka/IKafka.class |
|||
com/bfd/mf/datasave/listen/ListenKafkaManager.class |
|||
com/bfd/mf/entity/mysql/SubjectTask.class |
|||
com/bfd/mf/datasave/download/OkHttpUtils.class |
|||
com/bfd/mf/datasave/listen/ListenTaskManager.class |
|||
com/bfd/mf/datasave/kafka/ReadKafka.class |
|||
com/bfd/mf/datasave/listen/DataSaveManager.class |
|||
com/bfd/mf/datasave/download/test.class |
|||
com/bfd/mf/datasave/download/MyCookieJar.class |
|||
com/bfd/mf/datasave/tools/WriteMethod.class |
|||
com/bfd/mf/datasave/tools/PropertiesUtil.class |
|||
com/bfd/mf/entity/mysql/Userlimit.class |
|||
com/bfd/mf/datasave/tools/ReadLine.class |
|||
com/bfd/mf/entity/mysql/cl_task.class |
|||
com/bfd/mf/datasave/download/ProxyIp.class |
|||
com/bfd/mf/entity/impl/DataSaveManagerImpl.class |
|||
com/bfd/mf/entity/AllKeys.class |
|||
com/bfd/mf/datasave/tools/DBUtil.class |
|||
com/bfd/mf/entity/mysql/SubjectTaskdelone.class |
|||
com/bfd/mf/datasave/listen/ListenfastTaskManager.class |
|||
com/bfd/mf/entity/DataSaveManager.class |
|||
com/bfd/mf/datasave/download/NewsDownload.class |
|||
com/bfd/mf/entity/mysql/Tasklimit.class |
|||
com/bfd/mf/datasave/download/DownLoadFile.class |
|||
com/bfd/mf/datasave/tools/DateUtil.class |
|||
com/bfd/mf/datasave/tools/DBConnectionManager$DBConnectionPool.class |
|||
com/bfd/mf/entity/mysql/FiledTableInfo.class |
|||
com/bfd/mf/datasave/download/Newscontent.class |
|||
com/bfd/mf/entity/ReaultInfo.class |
|||
com/bfd/mf/datasave/tools/DataCheckUtil.class |
@ -0,0 +1,39 @@ |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/MyCookieJar.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/FieldNormaliz.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/impl/DataSaveManagerImpl.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataCheckUtil.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/testkongtianyuan.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/PropertiesUtil.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/DataSaveManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/test.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/ReaultInfo.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/ProxyIp.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/IKafka.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/Constants.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager_kongtianyuan.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DateUtil.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/FieldInfo.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/FiledTableInfo.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/WriteMethod.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/OkHttpUtils.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/Newscontent.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/ReadFile1125.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DBConnectionManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/ReadLine.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenfastTaskManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/TaskFileUtils.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DBUtil.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/GeterExit.java |
@ -0,0 +1,2 @@ |
|||
CreatIndex.class |
|||
Test1.class |
@ -0,0 +1,3 @@ |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/test/java/CreatIndex.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/test/java/AboutMysql.java |
|||
/Users/zhichengzhang/Desktop/公司/查询服务/datastreamfast/SocialX_Stream_1.0/cl_stream_datasave/src/test/java/Test1.java |
Write
Preview
Loading…
Cancel
Save
Reference in new issue