From 3b3c6c864f1cf404dfc13b5d6e059b5ea60d7d78 Mon Sep 17 00:00:00 2001 From: "zhicheng.zhang" Date: Thu, 29 May 2025 14:19:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=B7=E5=A4=96=E4=BB=BB=E5=8A=A1=E7=BC=93?= =?UTF-8?q?=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cl_stream_datasave/cl_stream_datasave.iml | 5 +- cl_stream_datasave/pom.xml | 6 +- .../com/bfd/mf/datasave/download/DownLoadFile.java | 13 +- .../com/bfd/mf/datasave/download/NewsDownload.java | 22 +- .../java/com/bfd/mf/datasave/kafka/ReadKafka.java | 4 +- .../bfd/mf/datasave/listen/DataSaveManager.java | 183 +++++++-- .../bfd/mf/datasave/listen/ListenKafkaManager.java | 9 +- .../bfd/mf/datasave/listen/ListenTaskManager.java | 13 +- .../com/bfd/mf/datasave/tools/DataProcess.java | 13 +- .../src/main/java/com/bfd/mf/entity/AllKeys.java | 3 +- .../java/com/bfd/mf/entity/mysql/SubjectTask.java | 457 +++++++++++++-------- .../java/com/bfd/mf/entity/mysql/Tasklimit.java | 71 +++- .../java/com/bfd/mf/entity/mysql/Userlimit.java | 10 +- .../main/java/com/bfd/mf/entity/mysql/cl_task.java | 5 +- .../java/com/bfd/mf/runstart/RunStartDataSave.java | 54 +-- 15 files changed, 595 insertions(+), 273 deletions(-) diff --git a/cl_stream_datasave/cl_stream_datasave.iml b/cl_stream_datasave/cl_stream_datasave.iml index c7198d5..70c025e 100644 --- a/cl_stream_datasave/cl_stream_datasave.iml +++ b/cl_stream_datasave/cl_stream_datasave.iml @@ -25,7 +25,7 @@ - + @@ -37,7 +37,8 @@ - + + diff --git a/cl_stream_datasave/pom.xml b/cl_stream_datasave/pom.xml index 118a6a7..8e27f1d 100644 --- a/cl_stream_datasave/pom.xml +++ b/cl_stream_datasave/pom.xml @@ -74,7 +74,7 @@ com.bfd elastiUtils - 0.0.1-SNAPSHOT + 0.0.2-SNAPSHOT kafka-utils @@ -104,7 +104,7 @@ mysql mysql-connector-java - 5.1.29 + 8.0.29 @@ -199,6 +199,7 @@ + org.apache.maven.plugins maven-compiler-plugin 1.8 @@ -208,6 +209,7 @@ org.springframework.boot spring-boot-maven-plugin + 2.0.0.RELEASE com.bfd.mf.runstart.RunStartDataSave diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java index 5c30703..3dd8350 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java @@ -32,7 +32,7 @@ public class DownLoadFile { Thread.sleep(4000); realUrl = JSONObject.parseObject(result).getString("src"); realresult.put("realUrl",realUrl); - realresult.put("size",size); + realresult.put("size",String.format("%.2f", size)); } } catch (IOException e) { @@ -44,7 +44,7 @@ public class DownLoadFile { return realresult; } - public static String upload(String uploadUrl,String fileName,byte[] content) { + public static String upload(String uploadUrl,String fileName,byte[] content) throws IOException{ String result = ""; try { OkHttpClient httpClient = new OkHttpClient(); @@ -66,6 +66,7 @@ public class DownLoadFile { result = body.string(); } } + } catch (Exception e) { e.printStackTrace(); } @@ -76,13 +77,13 @@ public class DownLoadFile { String realUrl = "";Integer size; String realresult=""; try{ - InputStream murl = new URL(getUrl).openStream(); - BufferedImage sourceImg = ImageIO.read(murl); + // InputStream murl = new URL(getUrl).openStream(); + // BufferedImage sourceImg = ImageIO.read(murl); int srcWidth = 0; // 源图宽度 int srcHeight = 0; // 源图高度 try { - srcWidth = sourceImg .getWidth(); - srcHeight = sourceImg .getHeight(); + // srcWidth = sourceImg.getWidth(); + //srcHeight = sourceImg.getHeight(); } catch (Exception e) { e.printStackTrace(); } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java index d31374a..0972c2c 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java @@ -5,12 +5,11 @@ import com.bfd.crawler.utils.JsonUtils; import java.io.IOException; import java.util.*; +import static com.bfd.mf.datasave.download.DownLoadFile.imagesize; + public class NewsDownload { private static String myGoFastAddr = "http://172.18.1.113:8080/upload"; - public static void downloadAndSaveimage(Map resultMap,List> imagePathSizevalue){ - List filePath= (List) resultMap.get("filePath"); - List imagePath= (List) resultMap.get("imagePath"); - List videoPath= (List) resultMap.get("videoPath"); + public static void downloadAndSaveimage(Map resultMap, List> imagePathSizevalue, List imagePath){ String putUrl = myGoFastAddr; List imagePathlist=new ArrayList<>(); Iterator it = imagePath.iterator(); @@ -58,13 +57,13 @@ public class NewsDownload { resultMap.put("srcimagePath",picturl); } } + + } - public static void downloadAndSaveFile(Map resultMap,List> filePathSizevalueList){ - List filePath= (List) resultMap.get("filePath"); - List imagePath= (List) resultMap.get("imagePath"); - List videoPath= (List) resultMap.get("videoPath"); + + public static void downloadAndSaveFile(Map resultMap, List> filePathSizevalueList,List filePath){ String putUrl = myGoFastAddr; //List> filePathSizevalueList = new ArrayList<>(); List filePathlist=new ArrayList<>(); @@ -111,12 +110,10 @@ public class NewsDownload { else { resultMap.put("ugc",0); } - - } - public static void downloadAndSavevideo(Map resultMap,List> videoPathSizevalueList){ - List videoPath= (List) resultMap.get("videoPath"); + + public static void downloadAndSavevideo(Map resultMap, List> videoPathSizevalueList, List videoPath){ String putUrl = myGoFastAddr; // List> videoPathSizevalueList = new ArrayList<>(); String videoTime=resultMap.get("videoTime").toString(); @@ -172,6 +169,7 @@ public class NewsDownload { + private static Map gofastswitch(Map rerversemap , Map responseMap) {//原始的gofast 以及下载后的gofast地址 Integer pgc= (Integer) responseMap.get("pgc");//图片 Integer egc= (Integer) responseMap.get("egc");//视频 diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java index aed90cb..a37a35f 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java @@ -33,9 +33,7 @@ public class ReadKafka implements IKafka{ // } // public void read(){ - System.out.println(this.queue+"队列"+this.defaultReadTopicName+"topic"+this.threadNums+"group"+this.groupId+"sercer"+this.kafkaServerName); - //KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums,this.groupId,this.kafkaServerName); - KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums, this.groupId,this.kafkaServerName); + KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums, this.groupId,this.kafkaServerName); } @Override public void read(String readTopicName) { diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java index ff9e033..1384d5e 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java @@ -4,8 +4,10 @@ import com.alibaba.fastjson.JSONArray; import com.bfd.crawler.elasti.ElastiProducer; import com.bfd.crawler.kafka7.KfkProducer; import com.bfd.crawler.utils.JsonUtils; + import com.bfd.mf.datasave.download.DownLoadFile; import com.bfd.mf.datasave.download.NewsDownload; +import com.bfd.mf.datasave.download.Newscontent; import com.bfd.mf.datasave.tools.DataCheckUtil; import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.datasave.tools.ReadLine; @@ -33,6 +35,7 @@ import static com.bfd.crawler.utils.DataUtil.calcMD5; public class DataSaveManager implements Runnable{ private static Logger log = Logger.getLogger(DataSaveManager.class); + private String data ; private FieldNormaliz fieldNormaliz ; private static Map>> subject; @@ -75,11 +78,17 @@ public class DataSaveManager implements Runnable{ } timetMap.put("dsbeginreadtime",System.currentTimeMillis()); Map tableInfo = tableInfoMap.get(bussinessType) ; - String res = convertData(jsonData, tableInfo); + String res = null; + try { + res = convertData(jsonData, tableInfo); + res =JsonUtils.toJSONString(jsonData); + } catch (Exception e) { + e.printStackTrace(); + } Map resultMap = getResponse(res); // resultMap 就是将要写入到 ES 和 kafka 的一条数据 resultMap.remove("processtime"); //Map resultindexMap = new HashMap(resultMap); - // System.out.println("The Message : "+JsonUtils.toJSONString(resultMap)); + // System.out.println("The Message : "+JsonUtils.toJSONString(resultMap)); // 1、先判断是主贴还是评论 主贴写日期索引,回帖评论写 渠道索引 String dateIndexName = getIndexName(resultMap); @@ -126,15 +135,25 @@ public class DataSaveManager implements Runnable{ timetMap.put("dbeginsentes",System.currentTimeMillis()); //处理新闻的主贴 - //对于非新闻的以及非上传的数据 + //对于非新闻的以及非上传的数据!resultMap.toString().contains("keyword:肖战")&&resultMap.toString().contains("keyword:罗翔")&& + +// if (!resultMap.toString().contains("keyword:罗翔")){ + + if(resultMap.containsKey("crawlDataFlag")&&!resultMap.containsKey("subjectId")) { //resultMap.containsKey("isDownload") String key = getAllMapKey(resultMap); - List filePath= (List) resultMap.get("filePath"); - List imagePath= (List) resultMap.get("imagePath"); - List videoPath= (List) resultMap.get("videoPath"); + List filePath= (List)JsonUtils.parseArray(resultMap.get("filePath").toString()); + List imagePath= (List)JsonUtils.parseArray(resultMap.get("imagePath").toString()); + List videoPath= (List) JsonUtils.parseArray(resultMap.get("videoPath").toString()); + List> imagePathSize = (List>) JsonUtils.parseArray(resultMap.get("imagePathSize").toString()); + List> filePathSize = (List>)JsonUtils.parseArray(resultMap.get("filePathSize").toString()); + List> videoPathSize = (List>)JsonUtils.parseArray(resultMap.get("videoPathSize").toString()); + + String avatarPath=resultMap.get("avatarPath").toString(); // 从 subject 中可以获取到这个key 对应的 专题信息 + System.out.println(key); if(disposeCrawldataflag(key)) { String getsubjectList=RedisUtil.get(key,10); List> subjectList = (List>) JsonUtils.parseArray(getsubjectList); @@ -143,7 +162,6 @@ public class DataSaveManager implements Runnable{ List> videoPathSizevalueList = new ArrayList<>(); List> filePathSizevalueList = new ArrayList<>(); List ocrText= (List) resultMap.get("ocrText"); - System.out.println(key+"====="); long maxtime= Long.parseLong(subjectMap.get("maxtime")); long mintme= Long.parseLong(subjectMap.get("mintime")); long pubTimecomape= Long.parseLong(pubTime); @@ -157,7 +175,7 @@ public class DataSaveManager implements Runnable{ String asrText= (String) resultMap.get("asrText"); String hasTrans= resultMap.get("hasTrans").toString(); //String ocrText= (String) resultMap.get("ocrText"); - if((pubTimecomape-maxtime<=0&&pubTimecomape-mintme>=0)||"eccontent".equals(pageType)||"2".equals(primary)||"socailFollow".equals(pageType)){ + if((pubTimecomape-maxtime<=0&&pubTimecomape-mintme>=0)||"eccontent".equals(pageType)||"2".equals(primary)||"socialFollow".equals(pageType)){ if("eccontent".equals(pageType)){ long pubtime=maxtime-1000*60*30; //System.out.println(pubtime+"======="); @@ -347,21 +365,38 @@ public class DataSaveManager implements Runnable{ } timetMap.put("enddowloadtime",System.currentTimeMillis()); - }else{ - //新闻主贴的处理逻辑newscontent, - // downloadPic,downloadFile,downloadVideo ,若有一个则需要进行isdown为true -// videoPath == egc -// filePath == ugc -// imagePath == pgc - if(crawl_content_key.contains("downloadPic")&&imagePath.size()>0){ - NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue); + }else { +// //新闻主贴的处理逻辑newscontent, +// // downloadPic,downloadFile,downloadVideo ,若有一个则需要进行isdown为true +//// videoPath == egc +//// filePath == ugc +//// imagePath == pgc +// if(crawl_content_key.contains("downloadPic")&&imagePath.size()>0){ +// NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue,); +// } +// if(crawl_content_key.contains("downloadFile")&&filePath.size()>0){ +// NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList); +// } +// if(crawl_content_key.contains("downloadVideo")&&videoPath.size()>0){ +// NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList); +// } + //System.out.print("=========="); + if(crawl_content_key.contains("downloadPic")&&imagePathSize.size()==0){ + NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue,imagePath); + }else if (crawl_content_key.contains("downloadPic")&&imagePathSize.size()>0) { + Newscontent.downloadAndSaveimage(resultMap,imagePathSize); } - if(crawl_content_key.contains("downloadFile")&&filePath.size()>0){ - NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList); + if(crawl_content_key.contains("downloadFile")&&filePathSize.size()==0){ + NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList,filePath); + }else if(crawl_content_key.contains("downloadFile")&&filePathSize.size()>0){ + Newscontent.downloadAndSavefile(resultMap,filePathSize); } - if(crawl_content_key.contains("downloadVideo")&&videoPath.size()>0){ - NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList); + if(crawl_content_key.contains("downloadVideo")&&videoPathSize.size()==0){ + NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList,videoPath); + }else if(crawl_content_key.contains("downloadVideo")&&videoPathSize.size()>0){ + Newscontent.downloadAndSavevideo(resultMap,videoPathSize); } + } if (filePathSizevalueList.size()==0&&imagePathSizevalue.size()==0&&videoPathSizevalueList.size()==0){ resultMap.put("isDownload","false"); @@ -410,7 +445,7 @@ public class DataSaveManager implements Runnable{ if (url.contains("http")){ revideoPathlist.add(url); }else { - url="http://172.18.1.113:8892"+url; + url="http://172.18.1.113:8080"+url; revideoPathlist.add(url); } } @@ -419,7 +454,87 @@ public class DataSaveManager implements Runnable{ } } - } catch (Exception e) { + //创新院独有的 + if("12331".equals(subject_id)){ + List revideoPath= (List) resultMap.get("videoPath"); + List revideoPathlist=new ArrayList<>(); + if (revideoPath.size()>0){ + Iterator it = revideoPath.iterator(); + while(it.hasNext()) { + String url= it.next(); + if (url.contains("http")){ + revideoPathlist.add(url); + }else { + url="http://crawl-files.pontoaplus.com"+url; + revideoPathlist.add(url); + } + } + resultMap.put("videoPath",revideoPathlist); + //writerToKafka(5, "xhs1223", resultMap); + } + + List reimagePath= (List) resultMap.get("imagePath"); + List reimagePathlist=new ArrayList<>(); + if (reimagePath.size()>0){ + Iterator it = reimagePath.iterator(); + while(it.hasNext()) { + String url= it.next(); + if (url.contains("http")){ + reimagePathlist.add(url); + }else { + url="http://crawl-files.pontoaplus.com"+url; + reimagePathlist.add(url); + } + } + resultMap.put("imagePath",reimagePathlist); + //writerToKafka(5, "xhs1223", resultMap); + } + + List refilePath= (List) resultMap.get("filePath"); + List refilePathlist=new ArrayList<>(); + if (refilePath.size()>0){ + Iterator it = refilePath.iterator(); + while(it.hasNext()) { + String url= it.next(); + if (url.contains("http")){ + refilePathlist.add(url); + }else { + url="http://crawl-files.pontoaplus.com"+url; + refilePathlist.add(url); + } + } + resultMap.put("filePath",refilePathlist); + //writerToKafka(5, "xhs1223", resultMap); + } + + if (!avatarPath.equals("")){ + String resulturl= null; + try { + + if (avatarPath.contains("http")){ + resulturl=avatarPath; + }else { + resulturl="http://crawl-files.pontoaplus.com"+avatarPath; + } + + //Map resultmap = DownLoadFile.downloadAndSaveFile(avatarPath, putUrl); + //resulturl = avatarPath.replace("172.18.1.113:8080","crawl-files.pontoaplus.com");; + } catch (Exception e) { + e.printStackTrace(); + } + + if(resulturl!= null && resulturl.length()!= 0){ + resultMap.put("avatarPath", resulturl); + } + else{ + resultMap.put("avatarPath", avatarPath); + } + } + writerToKafka(2, "dataFromES_10000", resultMap); + + } + + } catch (Throwable e) { e.printStackTrace(); } try { @@ -456,6 +571,7 @@ public class DataSaveManager implements Runnable{ else { System.out.println(" 这条数据都没有标识位,就不往专题的索引存储了呗!!!!" + resultMap.get("dataId")); } + // } timetMap.put("dendsentes",System.currentTimeMillis()); resultMap.put("processtime",timetMap); try { @@ -487,7 +603,7 @@ public class DataSaveManager implements Runnable{ responseMap.put("createTimeStr", DataCheckUtil.getCurrentTime(dateTime)); System.out.println("==========================写入到【专题】ES :==========" + indexName + " - "+responseMap.get("docId") ); if (null != docId && !("").equals(docId)) { - WriteMethod.writeMethod("20210621.txt",JsonUtils.toJSONString(responseMap)); + // WriteMethod.writeMethod("20210621aaa.txt",JsonUtils.toJSONString(responseMap)); ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, subjectEsNum, indexName, indexType); elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap)); } @@ -500,7 +616,8 @@ public class DataSaveManager implements Runnable{ String docId=responseMap.get("docId").toString(); System.out.println("==========================写入到【日期】ES : ==========" + indexName + " - "+responseMap.get("docId")); if (null != docId && !("").equals(docId)) { - //WriteMethod.writeMethod("2021525like.txt",JsonUtils.toJSONString(responseMap)); + log.info(JsonUtils.toJSONString(responseMap)); + // WriteMethod.writeMethod("2021525likeaaa.txt",JsonUtils.toJSONString(responseMap)); ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, indexEsNum, indexName, indexType); elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap)); } @@ -590,8 +707,6 @@ public class DataSaveManager implements Runnable{ List imagePath= (List) responseMap.get("imagePath"); List videoPath= (List) responseMap.get("videoPath"); String storyDetailPage= (String) responseMap.get("pageType"); -// pageType -// storyDetailPage Map resultmap=new HashMap<>(); if (pgc.equals(1)){ try { @@ -623,7 +738,6 @@ public class DataSaveManager implements Runnable{ resultmap.put("srcimagePath",pictureList); } catch (Exception e) { e.printStackTrace(); - //log.error(); } } if(ugc.equals(1)){ if(responseMap.get("forwardUrl")!=""&&!"storyDetailPage".equals(storyDetailPage)&&!"socialComment".equals(storyDetailPage)){ @@ -872,17 +986,6 @@ public class DataSaveManager implements Runnable{ String nomorprice = dataValue.toString().replaceAll("¥", "").replace("$","") ; jsonData.put(key, nomorprice); } -// if(key.equals("imagePath")&&dataValue != null){ -// List list=new ArrayList<>(); -// list.add(dataValue.toString()); -// jsonData.put(key,list); -// } -// if(key.equals("filePath") && dataValue != null){ -// List list=new ArrayList<>(); -// list.add(dataValue.toString()); -// jsonData.put(key,list); -// } - if(tableInfo.containsKey(key)){ // System.out.print("tableInfo"+tableInfo); String value = tableInfo.get(key); @@ -929,11 +1032,11 @@ public class DataSaveManager implements Runnable{ } } catch (Exception e) { //e.printStackTrace(); - String str=dataValue.toString().replace(", ",","); + String str=dataValue.toString().replace(", ",",").replace(",",","); str = str.substring(1,str.length()-1).trim(); String []strs =str.split(","); // System.out.println(strs.length+"数组的长度啊"); - System.out.println(str+"数组的长度啊"); + //System.out.println(str+"数组的长度啊"); List list = Arrays.asList(strs); jsonData.put(key, list) ; } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java index 17c77e6..356d52f 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java @@ -1,5 +1,4 @@ package com.bfd.mf.datasave.listen; -import com.bfd.crawler.kafka7.KfkConsumer; import com.bfd.mf.datasave.kafka.ReadKafka; import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.entity.FieldNormaliz; @@ -16,6 +15,7 @@ import java.util.concurrent.TimeUnit; public class ListenKafkaManager implements Runnable{ private LinkedBlockingDeque queue= new LinkedBlockingDeque(5000); + private LinkedBlockingDeque fastqueue= new LinkedBlockingDeque(5000); private boolean isRun = true; private FieldNormaliz fieldNormaliz; private ThreadPoolExecutor spiderPoolExec ; @@ -24,15 +24,14 @@ public class ListenKafkaManager implements Runnable{ public ListenKafkaManager(FieldNormaliz fieldNormaliz){ int croePoolsize = 20 ; - int maximumPoolsize = 80; + int maximumPoolsize = 30; long keepAliveTime = 0; this.spiderPoolExec = new ThreadPoolExecutor(croePoolsize, maximumPoolsize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue()); this.fieldNormaliz = fieldNormaliz ; this.subject = SubjectTask.subjectTaskMap; this.tableInfoMap = FiledTableInfo.tableInfoMap; String kafkaname = fieldNormaliz.getKafkaName() ; -// KfkConsumer.startReadThread(queue,"Ejingdongdedup_filter1",10,"333",2); - ReadKafka readKafka = new ReadKafka(queue , kafkaname ,12 , fieldNormaliz.getGroupId(), fieldNormaliz.getKafkaSerName(),fieldNormaliz.getEsSerName()); + ReadKafka readKafka = new ReadKafka(queue, kafkaname ,12 , fieldNormaliz.getGroupId(), fieldNormaliz.getKafkaSerName(),fieldNormaliz.getEsSerName()); readKafka.read(); } @@ -53,8 +52,6 @@ public class ListenKafkaManager implements Runnable{ private void addTask(String data){ while ( spiderPoolExec.getPoolSize() >= spiderPoolExec.getMaximumPoolSize() || spiderPoolExec.getActiveCount() >= spiderPoolExec.getMaximumPoolSize()) { - //System.out.println("线程满了啊"+spiderPoolExec.getPoolSize()+"最大线程数"+spiderPoolExec.getMaximumPoolSize()+"现有的线程数"+spiderPoolExec.getActiveCount()); - // System.out.println("线程满了啊"); try { Thread.sleep(200); } catch (InterruptedException e) { diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java index 9cbbf34..c95a0be 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java @@ -22,7 +22,18 @@ public class ListenTaskManager { ListenKafkaManager listenKafkaManager = new ListenKafkaManager(fieldNormaliz); new Thread(listenKafkaManager).start(); listenKafkaManagers.put(kafkaServerName+"#"+kafkaTopicName, listenKafkaManager); - } + //快速读取数据的topic + ListenfastTaskManager ListenfastTaskManager = new ListenfastTaskManager(fieldNormaliz); + new Thread(ListenfastTaskManager).start(); + + + + + } + + + + } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java index 8b82403..1e2774c 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java @@ -1,8 +1,10 @@ package com.bfd.mf.datasave.tools; import crawler.open.util.RedisUtil; +import org.apache.log4j.Logger; public class DataProcess implements Runnable { + private static Logger log = Logger.getLogger(DataProcess.class); @Override public void run() { while (true) { @@ -11,11 +13,12 @@ public class DataProcess implements Runnable { String a = Constants.getLineQueue().take(); String key=a.split("@#@")[0]; String value=a.split("@#@")[1]; - RedisUtil.set(key, value, 10); -// if(Constants.getLineQueue().size() == 1000){ -// Constants.getLineQueue().clear(); -// } - System.out.println(Constants.getLineQueue().size()+"队列的大小"); + + RedisUtil.set(key, value, 9); + System.out.println(Constants.getLineQueue().size()+"队列的大小"+key); + log.info(Constants.getLineQueue().size()+"队列的大小"+key); + // WriteMethod.writeMethod("1.txt", a); + log.info("插入redis的key" + key + " ; data = " +Constants.getLineQueue().size()+"队列的大小"); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java index 81914ed..d78f1ca 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java @@ -170,7 +170,8 @@ public class AllKeys { map.put("poorrate",0); map.put("processtime",new HashMap<>()); map.put("tag",""); - + map.put("mentionAccount",new ArrayList<>()); + map.put("mentionAccountUrl",new ArrayList<>()); } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java index d534c65..f20ab8b 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java @@ -3,16 +3,14 @@ package com.bfd.mf.entity.mysql; //import com.bfd.crawler.utils.JsonUtils; import com.bfd.crawler.utils.JsonUtils; -import com.bfd.mf.datasave.listen.DataSaveManager; import com.bfd.mf.datasave.tools.Constants; import com.bfd.mf.datasave.tools.DBUtil; import com.bfd.mf.datasave.tools.DateUtil; -import com.bfd.mf.datasave.tools.WriteMethod; import crawler.open.util.RedisUtil; import org.apache.log4j.Logger; +import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; import static com.bfd.mf.entity.mysql.Tasklimit.subjectTasktimelimiit; import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit; @@ -186,171 +184,308 @@ public class SubjectTask implements Runnable { @Override public void run() { - while (true){ - subjectTaskMap.clear(); - Userlimit.loaduser(); - Tasklimit.loadTask(); - long updatetime = new Date().getTime()/1000-30000; -// List> subjectTaskList = DBUtil.getInstance("db_stat").query("select cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id)where (ct.crawl_status=1 or ct.crawl_status=3) and ct.del=0 ;");ct.app_id=cs.app_id and - //String time=DateUtil.getDate(); - //System.out.println(time); - System.out.println("结束时间"+ updatetime); - List> subjectTaskList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.app_id=cs.app_id and ct.cid!=\"\" and unix_timestamp(ct.update_time)>'"+updatetime+"' and ct.crawl_data_flag like '%气象侦察机%'order by ct.update_time desc;"); - System.out.println(subjectTaskList.size()); - if(subjectTaskList.size() > 0){ - String key = ""; - for(Map subjectTask : subjectTaskList){ //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} - String keytwo = ""; - if( subjectTask.get("cid").equals("Tmall")){ - key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); - keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); - } - else if (subjectTask.get("cid").equals("Taobao")){ - key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); - keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); - } - else { - key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); - } - Map value = new HashMap<>(); - List> valueList = new ArrayList<>(); - String v_subject_id = ""; - String v_go_fast_addr = ""; - String v_kafka_switch = ""; - String v_kafka_addr = ""; - String v_task_id = ""; - String v_external_id =""; - String v_go_fast_switch=""; - String v_kafka_topic=""; - String v_status=""; - String v_del=""; - String v_create_user_id=""; - String v_ocr="0"; - String v_trans="0"; - String v_crawl_content_key=""; - if(null != subjectTask.get("subject_id")) { - v_subject_id = subjectTask.get("subject_id").toString(); - } - if(null != subjectTask.get("crawl_content_key")) { - v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); - } - if(null != subjectTask.get("go_fast_addr")) { - v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); - } - if(null != subjectTask.get("kafka_addr")) { - v_kafka_addr = subjectTask.get("kafka_addr").toString(); - } - if(null != subjectTask.get("kafka_switch")){ - v_kafka_switch = subjectTask.get("kafka_switch").toString(); - } - if(null !=subjectTask.get("id")){ - v_task_id = subjectTask.get("id").toString(); - } - if(null !=subjectTask.get("external_id")){ - v_external_id = subjectTask.get("external_id").toString(); - } - if(null !=subjectTask.get("go_fast_switch")){ - v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); - } - if(null !=subjectTask.get("kafka_topic")){ - v_kafka_topic = subjectTask.get("kafka_topic").toString(); - } -// if(null !=subjectTask.get("status")){ -// v_status = subjectTask.get("status").toString(); -// } - if(null !=subjectTask.get("del")){ - v_del = subjectTask.get("del").toString(); - } - if(null !=subjectTask.get("create_user_id")){ - v_create_user_id = subjectTask.get("create_user_id").toString(); - } - value.put("subject_id",v_subject_id); - value.put("go_fast_addr",v_go_fast_addr); - value.put("export_to_kafka",v_kafka_switch); - value.put("kafka_addr",v_kafka_addr); - // value.put("task_id",v_task_id); - value.put("external_id",v_external_id); - value.put("go_fast_switch",v_go_fast_switch); - value.put("kafka_topic",v_kafka_topic); - // value.put("status",v_status);//专题的状态 - value.put("del",v_del);//专题的状态 - value.put("appid",subjectTask.get("app_id").toString()); - value.put("crawl_content_key",v_crawl_content_key); - //System.out.print(v_external_id+"external_id"); - String newkey = key.toLowerCase(); - String userkey=newkey+"#####"+subjectTask.get("app_id").toString().toLowerCase(); + while (true) try { + { + // subjectTaskMap.clear(); + Userlimit.loaduser(); + Tasklimit.loadTask(); + long updatetime = new Date().getTime()/1000-15; + SimpleDateFormat datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String now=datetime.format(Calendar.getInstance().getTime()); + System.out.println(updatetime+now); + long starttime=new Date().getTime()/1000-20L; + if (disposeCrawldataflag("Tasktimelimiit")){ + starttime= Long.parseLong(RedisUtil.get("Tasktimelimiit",9)); + starttime=starttime-5; + RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9); + }else { + System.out.println("第一次写入"); + RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9); + } + System.out.println(System.currentTimeMillis()+"开始mysql执行的时间"); + List> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.crawl_start_time,ct.crawl_end_time,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and ct.app_id!='bw01' and unix_timestamp(ct.start_time)>'"+starttime+"';"); - //组装时间的参数 - if (subjectTasktimelimiit.containsKey(userkey)){ - List>timelist=subjectTasktimelimiit.get(userkey); - if(timelist.size()==1){ - for(Map subjectTasktime : timelist){ - value.put("maxtime",subjectTasktime.get("max_time").toString()); - value.put("mintime",subjectTasktime.get("min_time").toString()); - } - } else{ - for(Map subjectTasktime : timelist){ - String subject_id=subjectTasktime.get("subject_id").toString(); - if (v_subject_id.equals(subject_id)){ - value.put("maxtime",subjectTasktime.get("max_time").toString()); - value.put("mintime",subjectTasktime.get("min_time").toString()); - } - } - } +// List> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and crawl_data_flag like '%http://xhslink.com/RLmh8p%';"); + System.out.println(System.currentTimeMillis()+"执行mysql任务结束的时间"); + if(subjectTaskendList.size()>0){ + System.out.println(subjectTaskendList.size()+"任务的大小"); + for(Map subjectTask : subjectTaskendList){ + String key = ""; + String keytwo = ""; + if( subjectTask.get("cid").equals("Tmall")){ + key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + keytwo = "Taobao"+ "#####" + subjectTask.get("crawl_data_flag"); + } + else if (subjectTask.get("cid").equals("Taobao")){ + key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + keytwo = "Tmall"+ "#####" + subjectTask.get("crawl_data_flag"); + } + else { + key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + } + Map value = new HashMap<>(); + List> valueList = new ArrayList<>(); + String v_subject_id = ""; + String v_go_fast_addr = ""; + String v_kafka_switch = ""; + String v_kafka_addr = ""; + String v_task_id = ""; + String v_external_id =""; + String v_go_fast_switch=""; + String v_kafka_topic=""; + String v_status=""; + String v_del=""; + String v_create_user_id=""; + String v_ocr="0"; + String v_trans="0"; + String v_crawl_content_key=""; + String appid=""; + if(null != subjectTask.get("subject_id")) { + v_subject_id = subjectTask.get("subject_id").toString(); + } + if(null != subjectTask.get("crawl_content_key")) { + v_crawl_content_key = subjectTask.get("crawl_content_key").toString(); + } + if(null != subjectTask.get("go_fast_addr")) { + v_go_fast_addr = subjectTask.get("go_fast_addr").toString(); + } + if(null != subjectTask.get("kafka_addr")) { + v_kafka_addr = subjectTask.get("kafka_addr").toString(); + } + if(null != subjectTask.get("kafka_switch")){ + v_kafka_switch = subjectTask.get("kafka_switch").toString(); + } + if(null !=subjectTask.get("id")){ + v_task_id = subjectTask.get("id").toString(); + } + if(null !=subjectTask.get("external_id")){ + v_external_id = subjectTask.get("external_id").toString(); + } + if(null !=subjectTask.get("go_fast_switch")){ + v_go_fast_switch = subjectTask.get("go_fast_switch").toString(); + } + if(null !=subjectTask.get("kafka_topic")){ + v_kafka_topic = subjectTask.get("kafka_topic").toString(); + } + if(null !=subjectTask.get("del")){ + v_del = subjectTask.get("del").toString(); + } + if(null !=subjectTask.get("create_user_id")){ + v_create_user_id = subjectTask.get("create_user_id").toString(); + } + if(null !=subjectTask.get("app_id")){ + appid = subjectTask.get("app_id").toString(); + } - } -// //用户的权限 - if (subjectuserlimiit.containsKey(v_create_user_id)){ - Map permission= (Map) subjectuserlimiit.get(v_create_user_id); - v_ocr= permission.get("is_ocr").toString(); - v_trans= permission.get("is_trans").toString(); - } - value.put("is_ocr",v_ocr); - value.put("is_trans",v_trans); - //组装相同任务的任务id - if(subjectTaskMap.containsKey(newkey)){ - valueList = subjectTaskMap.get(newkey); - for (Map valuetask : valueList){ - String task=valuetask.get("task_id")+","+v_task_id; - valuetask.put("task_id",task); - value.put("task_id",task); - } - valueList.add(value); - }else{ - value.put("task_id",v_task_id); - valueList.add(value); - } + value.put("appid", appid); + value.put("subject_id",v_subject_id); + value.put("go_fast_addr",v_go_fast_addr); + value.put("export_to_kafka",v_kafka_switch); + value.put("kafka_addr",v_kafka_addr); + // value.put("task_id",v_task_id); + value.put("external_id",v_external_id); + value.put("go_fast_switch",v_go_fast_switch); + value.put("kafka_topic",v_kafka_topic); + value.put("del",v_del);//专题的状态 + value.put("crawl_content_key",v_crawl_content_key); + value.put("maxtime",subjectTask.get("crawl_end_time").toString()); + value.put("mintime",subjectTask.get("crawl_start_time").toString()); + String newkey = key.toLowerCase(); + String userkey=newkey; + //组装时间的参数 + try { +// if (!subjectTasktimelimiit.containsKey(userkey)) { //如果无最大和最小时间 +// Tasklimit.loadTask(); +// } +// List>timelist=subjectTasktimelimiit.get(userkey); +// if(timelist.size()==1){ +// for(Map subjectTasktime : timelist){ +// value.put("maxtime",subjectTasktime.get("max_time").toString()); +// value.put("mintime",subjectTasktime.get("min_time").toString()); +// } +// } else{ +// for(Map subjectTasktime : timelist){ +// String subject_id=subjectTasktime.get("subject_id").toString(); +// if (v_subject_id.equals(subject_id)){ +// value.put("maxtime",subjectTasktime.get("max_time").toString()); +// value.put("mintime",subjectTasktime.get("min_time").toString()); +// } +// } +// } + } catch (Exception e) { + System.out.print("获取时间失败了"+subjectTask.get("crawl_data_flag")); + value.put("maxtime", "1735660800000"); + value.put("mintime","0"); + e.printStackTrace(); + } + // //用户的权限 + if (subjectuserlimiit.containsKey(v_create_user_id)){ + Map permission= (Map) subjectuserlimiit.get(v_create_user_id); + v_ocr= permission.get("is_ocr").toString(); + v_trans= permission.get("is_trans").toString(); + } + value.put("is_ocr",v_ocr); + value.put("is_trans",v_trans); + value.put("task_id",v_task_id); + valueList.add(value); + Set keysSet = new HashSet(); +// if(subjectTaskMap.containsKey(newkey)){ +// valueList = subjectTaskMap.get(newkey); +// for (Map valuetask : valueList){ +// String task=valuetask.get("task_id")+","+v_task_id; +// valuetask.put("task_id",task); +// value.put("task_id",task); +// } +// valueList.add(value); +// }else{ +// value.put("task_id",v_task_id); +// valueList.add(value); +// } + if(disposeCrawldataflag(newkey)) { //判断redis中的key + String getsubjectList=RedisUtil.get(newkey,9); + try { + List> subjectList = (List>) JsonUtils.parseArray(getsubjectList); + List> redistList=new ArrayList<>(); + for(Map redissubjectTask : subjectList){ + String resubject_id=redissubjectTask.get("subject_id").toString(); + String reappid=redissubjectTask.get("appid").toString(); + String taskid=redissubjectTask.get("task_id").toString(); + String maxtime=""; + if(redissubjectTask.containsKey("maxtime")){ + maxtime=redissubjectTask.get("maxtime").toString(); + } + if(!resubject_id.equals(v_subject_id)||!reappid.equals(appid)||!v_task_id.equals(taskid)){ + redistList.add(redissubjectTask);; - if(keytwo.length()>0){ - String tmallnewkey = keytwo.toLowerCase(); - subjectTaskMap.put(tmallnewkey,valueList); - } - String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList); - try { - Constants.getLineQueue().put(redis); - } catch (InterruptedException e) { - e.printStackTrace(); + } + + } + valueList.addAll(redistList); + String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList); + try { + System.out.println("写入redis是"+redis); + Constants.getLineQueue().put(redis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //天猫 + if(keytwo.length()>0){ + String tmallnewkey = keytwo.toLowerCase(); + String redis2=tmallnewkey+"@#@"+JsonUtils.toJSONString(valueList); + try { + Constants.getLineQueue().put(redis2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } catch (Exception e) { + log.error("获取redis中的任务失败"+getsubjectList); + } + }else { + String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList); + System.out.println(redis+"不存在"); + try { + Constants.getLineQueue().put(redis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(keytwo.length()>0){ + String tmallnewkey = keytwo.toLowerCase(); + String redis2=tmallnewkey+"@#@"+JsonUtils.toJSONString(valueList); + try { + Constants.getLineQueue().put(redis2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +// subjectTaskMap.put(newkey,valueList); +// //System.out.print(valueList); +// +// //System.out.print(JsonUtils.toJSONString(subjectTaskMap)); +// +// Set keysSets = new HashSet(); +// for (String newkeya :subjectTaskMap.keySet()){ +// List> newkeyvalue=subjectTaskMap.get(newkeya); +// List> setkeyvalue=new ArrayList<>(); +// for(Map redissubjectTask : newkeyvalue){ +// String resubject_id=redissubjectTask.get("subject_id").toString(); +// String reappid=redissubjectTask.get("appid").toString(); +// String taskid=redissubjectTask.get("task_id").toString(); +// String keys=resubject_id+reappid+newkeya+taskid; +// RedisUtil.set(keys, "1", 10); +//// int beforeSize = keysSets.size(); +//// keysSets.add(keys); +//// int afterSize = keysSets.size(); +//// if(afterSize == beforeSize + 1){ +//// setkeyvalue.add(redissubjectTask); +//// } +// setkeyvalue.add(redissubjectTask); +// } +// String redis=newkeya+"@#@"+JsonUtils.toJSONString(setkeyvalue); +// System.out.println(redis); +// try { +// Constants.getLineQueue().put(redis); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } + System.out.println(System.currentTimeMillis()+"任务组装结束的时间"); + log.info("结束时间"+ DateUtil.getcurr()); + log.info("当天任务的数量" + key + " ; data = " + subjectTaskendList.size()); +// }else { +// String delkey = subjectendtTask.get("cid") + "#####" + subjectendtTask.get("crawl_data_flag"); +// log.info(delkey.toLowerCase()+"====del"); +// RedisUtil.del(delkey.toLowerCase(), 10); +// } + } + }else { + Thread.sleep(1000*5); + } + + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + private boolean disposeCrawldataflag(String crawldataflag) { + try{ + if (RedisUtil.exists(crawldataflag, 9)) { // 先去 redis中查询是否存在,不存直接忽略 + String value = RedisUtil.get(crawldataflag,9); + if(null != value && !("").equals(value)) { + + return true; } - // RedisUtil.set(newkey, JsonUtils.toJSONString(valueList), 10); - // System.out.println("结束时间"+ DateUtil.getcurr()); - subjectTaskMap.put(newkey,valueList); - //System.out.println(newkey); + + } else { + log.error("[datasave] exec >>> 灌数:该 crwaldataflag 在 Redis 中不存在!!! keys = " + crawldataflag + " ; dbindex = " + 10); + return false; } - System.out.println("结束时间"+ DateUtil.getcurr()); - //System.out.println(subjectTaskMap.size()); - log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size()); - //SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 - //System.out.println(subjectTaskList.size());// new Date()为获取当前系统时间 - //WriteMethod.writeMethod("0621test.txt",JsonUtils.toJSONString(subjectTaskMap)); - //System.out.println(JsonUtils.toJSONString(subjectTaskMap)+"当前时间"+ DateUtil.getcurr()); + return false; + }catch (Exception e){ + e.printStackTrace(); + return false; } - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + + public static void main(String[] args) { + + String resubject_id="1"; + String v_subject_id="1"; + String reappid="12"; + + String appid="12"; + String v_task_id="12"; + + String taskid="13"; + if(!resubject_id.equals(v_subject_id)||!reappid.equals(appid)||!v_task_id.equals(taskid)){ + System.out.println("11"); + } } + } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java index 56f15f4..9b7ff3c 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java @@ -2,6 +2,7 @@ package com.bfd.mf.entity.mysql; import com.bfd.crawler.utils.JsonUtils; import com.bfd.mf.datasave.tools.DBUtil; +import com.bfd.mf.datasave.tools.WriteMethod; import javax.xml.bind.util.JAXBSource; import java.util.ArrayList; @@ -15,8 +16,67 @@ public class Tasklimit { public static Map>>subjectTasktimelimiit = new HashMap<>(); public static void loadTask(){ subjectTasktimelimiit.clear(); - List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;"); - System.out.println(Tasktimelimiit.size()+"Tasktimelimiit"); + List> Tasktimelimiit = DBUtil.getInstance("db_stat").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where cid!='MUZX2WNM84L3ARXNSYN3'and del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;"); + + // List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;"); + System.out.println(Tasktimelimiit.size()+"时间排序Tasktimelimiit"); + if (Tasktimelimiit.size()>0){ + String newkey = ""; + for(Map subjectTask : Tasktimelimiit) { //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} + String keytwo = ""; + Map value = new HashMap<>(); + List> valueList = new ArrayList<>(); + + if (subjectTask.get("cid").equals("Tmall")) { + newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + keytwo = "Taobao" + "#####" + subjectTask.get("crawl_data_flag"); + } else if (subjectTask.get("cid").equals("Taobao")) { + newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + keytwo = "Tmall" + "#####" + subjectTask.get("crawl_data_flag"); + } else { + newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); + } + String max_time = ""; + String min_time = ""; + String subject_id=""; + //newkey=newkey+"#####" +subjectTask.get("app_id"); + newkey= newkey.toLowerCase(); + subject_id=subjectTask.get("subject_id").toString(); + + try { + max_time=subjectTask.get("crawl_end_time").toString(); + } catch (Exception e) { + System.out.println(subjectTask); + e.printStackTrace(); + } + value.put("max_time",max_time); + min_time=subjectTask.get("crawl_start_time").toString(); + value.put("min_time",min_time); + value.put("subject_id",subject_id); + + if(subjectTasktimelimiit.containsKey(newkey)){ + valueList = subjectTasktimelimiit.get(newkey); + valueList.add(value); + }else{ + valueList.add(value); + } + if(keytwo.length()>0){ + String tmallnewkey = keytwo.toLowerCase(); + subjectTasktimelimiit.put(tmallnewkey,valueList); + } + subjectTasktimelimiit.put(newkey,valueList); + } + + //System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit)); + + } + } + public static void loaderrTask(String cid,String crawl_data_flag){ + subjectTasktimelimiit.clear(); + List> Tasktimelimiit = DBUtil.getInstance("db_stat").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!='"+cid+"' and crawl_data_flag='"+crawl_data_flag+"';"); + + // List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;"); + System.out.println(Tasktimelimiit.size()+"时间排序Tasktimelimiit"); if (Tasktimelimiit.size()>0){ String newkey = ""; for(Map subjectTask : Tasktimelimiit) { //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} @@ -36,7 +96,7 @@ public class Tasklimit { String max_time = ""; String min_time = ""; String subject_id=""; - newkey=newkey+"#####" +subjectTask.get("app_id"); + //newkey=newkey+"#####" +subjectTask.get("app_id"); newkey= newkey.toLowerCase(); subject_id=subjectTask.get("subject_id").toString(); @@ -58,8 +118,11 @@ public class Tasklimit { } subjectTasktimelimiit.put(newkey,valueList); } - // System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit)); + + //System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit)); } } + + } diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java index 74c0494..37e2060 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java @@ -15,16 +15,16 @@ public class Userlimit { public static Mapsubjectuserlimiit = new HashMap<>(); public static void loaduser() { subjectuserlimiit.clear(); - List> userlimiit = DBUtil.getInstance("db_stat").query("SELECT user_id,is_ocr,is_asr,is_trans FROM `cl_user_config`"); + List> userlimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT user_id,permission FROM `cl_user_config`"); if (userlimiit.size() > 0) { for (Map subjectuser : userlimiit) { int is_ocr=0; int is_trans=0; String userid=(String) subjectuser.get("user_id"); - if (subjectuser.containsKey("is_ocr")&&null!=subjectuser.get("is_ocr")) { - is_ocr=(int) subjectuser.get("is_ocr"); + if (subjectuser.containsKey("permission")&&subjectuser.get("permission").toString().contains("ocr")) { + is_ocr=1; } - if (subjectuser.containsKey("is_trans")&&null!=subjectuser.get("is_trans")) { - is_trans =(int) subjectuser.get("is_trans"); + if (subjectuser.containsKey("permission")&&subjectuser.get("permission").toString().contains("translation")) { + is_trans =1; } Map value = new HashMap<>(); value.put("is_ocr",is_ocr); diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java index 7f46c59..8dc6d19 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java @@ -14,7 +14,10 @@ public class cl_task { public static List subtaskstatuslimit = new ArrayList<>(); public static List subtaskstatuslimit3 = new ArrayList<>(); public static void loadTask(){ - List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_status=3 and update_time like '%2021-07-14%' GROUP BY crawl_data_flag,cid;"); +// List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_status=3 and update_time like '%2021-07-14%' GROUP BY crawl_data_flag,cid;"); + + List> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_data_flag like '%keyword:肖战%' GROUP BY crawl_data_flag,cid;"); + if (Tasktimelimiit.size()>0){ String newkey = ""; diff --git a/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java b/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java index edaf334..1c31958 100644 --- a/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java +++ b/cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java @@ -1,9 +1,6 @@ package com.bfd.mf.runstart; -import com.bfd.crawler.kafka7.KfkConsumer; -import com.bfd.crawler.kafka7.consts.KafkaConsts; import com.bfd.mf.datasave.tools.DataProcess; -import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.entity.DataSaveManager; import com.bfd.mf.entity.impl.DataSaveManagerImpl; import com.bfd.mf.datasave.tools.DBUtil; @@ -22,7 +19,7 @@ public class RunStartDataSave { private static final Logger LOG = Logger.getLogger(RunStartDataSave.class); private static String log4jPath = "../etc/log4j.properties"; - private static String dbPath = "../etc/db.properties"; + private static String dbPath = "../etc/134db.properties"; private static String redisPath = "../etc/145redis.properties"; static { PropertyConfigurator.configureAndWatch(log4jPath); @@ -30,9 +27,7 @@ public class RunStartDataSave { RedisUtil.init(redisPath); } public static void main(String[] args) { - - - cl_task.loadTask(); + //cl_task.loadTask(); // try { // FiledTableInfo.loadTableInfo(); // startRmiService(); @@ -43,7 +38,7 @@ public class RunStartDataSave { // try { // //Userlimit.loaduser(); // //Tasklimit.loadTask(); -// SubjectTask.loadSubjectTask(); +// // SubjectTask.loadSubjectTask(); // } catch (Exception e) { // e.printStackTrace(); // } @@ -58,25 +53,36 @@ public class RunStartDataSave { + for (int i = 0; i < 1; i ++) { + SubjectTask subjectTask = new SubjectTask(); + Thread SubjectTaskThread = new Thread(subjectTask, "SubjectTask" + i); + SubjectTaskThread.start(); + } + +// // for (int i = 0; i < 1; i ++) { -// SubjectTask SubjectTask = new SubjectTask(); -// Thread SubjectTaskThread = new Thread(SubjectTask, "dataDedupProcess" + i); +// SubjectTaskdele subjectTaskdel = new SubjectTaskdele(); +// Thread SubjectTaskThread = new Thread(subjectTaskdel, "SubjectTask" + i); // SubjectTaskThread.start(); // } -//// try { -//// Thread.sleep(6000); -//// } catch (InterruptedException e) { -//// e.printStackTrace(); -//// } -// //多线程写redis -// for (int i = 0; i < 100; i ++) { -// DataProcess dataProcess = new DataProcess(); -// Thread dataProcessThread = new Thread(dataProcess, "dataDedupProcess" + i); -// dataProcessThread.start(); +// + //多线程写redis + for (int i = 0; i < 10; i ++) { + DataProcess dataProcess = new DataProcess(); + Thread dataProcessThread = new Thread(dataProcess, "dataDedupProcess" + i); + dataProcessThread.start(); + } + + + + + +// for (int i = 0; i < 1; i ++) { +// si siupdate = new si(); +// Thread update = new Thread(siupdate, "SubjectTask" + i); +// update.start(); // } -// Timer timer2 = new Timer(); -// timer.schedule(new UpdateTask(), new Date(), 4*1000); @@ -93,11 +99,11 @@ public class RunStartDataSave { * 本地主机上的远程对象注册表Registry的实例, 并指定端口为8888,这一步必不可少(Java默认端口是1099), * 必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 ***/ - LocateRegistry.createRegistry(2099);//3888 + LocateRegistry.createRegistry(3099);//6888 /*** 把远程对象注册到RMI注册服务器上,并命名为taskManager ***/ /*** 绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的) ***/ - Naming.bind("//127.0.0.1:2099/dataSaveManager", dataSaveManager); + Naming.bind("//127.0.0.1:3099/dataSaveManager", dataSaveManager); System.out.println(">>>>>INFO:远程IHello对象绑定成功!"); } catch (RemoteException e) { System.out.println("创建远程对象发生异常!");