Browse Source

海外任务缓存

sitask
zhicheng.zhang 4 weeks ago
parent
commit
3b3c6c864f
  1. 5
      cl_stream_datasave/cl_stream_datasave.iml
  2. 6
      cl_stream_datasave/pom.xml
  3. 13
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java
  4. 22
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java
  5. 2
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java
  6. 179
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java
  7. 9
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java
  8. 11
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java
  9. 13
      cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java
  10. 3
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java
  11. 267
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java
  12. 71
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java
  13. 10
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java
  14. 5
      cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java
  15. 54
      cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java

5
cl_stream_datasave/cl_stream_datasave.iml

@ -25,7 +25,7 @@
<orderEntry type="library" name="Maven: io.netty:netty:3.7.0.Final" level="project" /> <orderEntry type="library" name="Maven: io.netty:netty:3.7.0.Final" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:14.0.1" level="project" /> <orderEntry type="library" name="Maven: com.google.guava:guava:14.0.1" level="project" />
<orderEntry type="library" name="Maven: commons-lang:commons-lang:2.4" level="project" /> <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.4" level="project" />
<orderEntry type="library" name="Maven: com.bfd:elastiUtils:0.0.1-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: com.bfd:elastiUtils:0.0.2-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: kafka-utils:kafka:0.10" level="project" /> <orderEntry type="library" name="Maven: kafka-utils:kafka:0.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.kafka:kafka_2.10:0.10.2.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.kafka:kafka_2.10:0.10.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:0.10.2.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:0.10.2.0" level="project" />
@ -37,7 +37,8 @@
<orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.21" level="project" /> <orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.21" level="project" />
<orderEntry type="library" name="Maven: com.101tec:zkclient:0.10" level="project" /> <orderEntry type="library" name="Maven: com.101tec:zkclient:0.10" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:fastjson:1.1.22" level="project" /> <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.1.22" level="project" />
<orderEntry type="library" name="Maven: mysql:mysql-connector-java:5.1.29" level="project" />
<orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.29" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:3.19.4" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch:6.2.3" level="project" /> <orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch:6.2.3" level="project" />
<orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch-core:6.2.3" level="project" /> <orderEntry type="library" name="Maven: org.elasticsearch:elasticsearch-core:6.2.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.lucene:lucene-core:7.2.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.lucene:lucene-core:7.2.1" level="project" />

6
cl_stream_datasave/pom.xml

@ -74,7 +74,7 @@
<groupId>com.bfd</groupId> <groupId>com.bfd</groupId>
<artifactId>elastiUtils</artifactId> <artifactId>elastiUtils</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>0.0.2-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>kafka-utils</groupId> <groupId>kafka-utils</groupId>
@ -104,7 +104,7 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
<version>8.0.29</version>
</dependency> </dependency>
<dependency> <dependency>
@ -199,6 +199,7 @@
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.8</source> <source>1.8</source>
@ -208,6 +209,7 @@
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.0.RELEASE</version>
<configuration> <configuration>
<mainClass>com.bfd.mf.runstart.RunStartDataSave</mainClass> <mainClass>com.bfd.mf.runstart.RunStartDataSave</mainClass>
</configuration> </configuration>

13
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/DownLoadFile.java

@ -32,7 +32,7 @@ public class DownLoadFile {
Thread.sleep(4000); Thread.sleep(4000);
realUrl = JSONObject.parseObject(result).getString("src"); realUrl = JSONObject.parseObject(result).getString("src");
realresult.put("realUrl",realUrl); realresult.put("realUrl",realUrl);
realresult.put("size",size);
realresult.put("size",String.format("%.2f", size));
} }
} catch (IOException e) { } catch (IOException e) {
@ -44,7 +44,7 @@ public class DownLoadFile {
return realresult; return realresult;
} }
public static String upload(String uploadUrl,String fileName,byte[] content) {
public static String upload(String uploadUrl,String fileName,byte[] content) throws IOException{
String result = ""; String result = "";
try { try {
OkHttpClient httpClient = new OkHttpClient(); OkHttpClient httpClient = new OkHttpClient();
@ -66,6 +66,7 @@ public class DownLoadFile {
result = body.string(); result = body.string();
} }
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -76,13 +77,13 @@ public class DownLoadFile {
String realUrl = "";Integer size; String realUrl = "";Integer size;
String realresult=""; String realresult="";
try{ try{
InputStream murl = new URL(getUrl).openStream();
BufferedImage sourceImg = ImageIO.read(murl);
// InputStream murl = new URL(getUrl).openStream();
// BufferedImage sourceImg = ImageIO.read(murl);
int srcWidth = 0; // 源图宽度 int srcWidth = 0; // 源图宽度
int srcHeight = 0; // 源图高度 int srcHeight = 0; // 源图高度
try { try {
srcWidth = sourceImg .getWidth();
srcHeight = sourceImg .getHeight();
// srcWidth = sourceImg.getWidth();
//srcHeight = sourceImg.getHeight();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

22
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/download/NewsDownload.java

@ -5,12 +5,11 @@ import com.bfd.crawler.utils.JsonUtils;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import static com.bfd.mf.datasave.download.DownLoadFile.imagesize;
public class NewsDownload { public class NewsDownload {
private static String myGoFastAddr = "http://172.18.1.113:8080/upload"; private static String myGoFastAddr = "http://172.18.1.113:8080/upload";
public static void downloadAndSaveimage(Map<String, Object> resultMap,List<Map<String,String>> imagePathSizevalue){
List<String> filePath= (List<String>) resultMap.get("filePath");
List<String> imagePath= (List<String>) resultMap.get("imagePath");
List<String> videoPath= (List<String>) resultMap.get("videoPath");
public static void downloadAndSaveimage(Map<String, Object> resultMap, List<Map<String, String>> imagePathSizevalue, List<String> imagePath){
String putUrl = myGoFastAddr; String putUrl = myGoFastAddr;
List<String> imagePathlist=new ArrayList<>(); List<String> imagePathlist=new ArrayList<>();
Iterator<String> it = imagePath.iterator(); Iterator<String> it = imagePath.iterator();
@ -58,13 +57,13 @@ public class NewsDownload {
resultMap.put("srcimagePath",picturl); resultMap.put("srcimagePath",picturl);
} }
} }
} }
public static void downloadAndSaveFile(Map<String, Object> resultMap,List<Map<String,String>> filePathSizevalueList){
List<String> filePath= (List<String>) resultMap.get("filePath");
List<String> imagePath= (List<String>) resultMap.get("imagePath");
List<String> videoPath= (List<String>) resultMap.get("videoPath");
public static void downloadAndSaveFile(Map<String, Object> resultMap, List<Map<String, String>> filePathSizevalueList,List<String> filePath){
String putUrl = myGoFastAddr; String putUrl = myGoFastAddr;
//List<Map<String,String>> filePathSizevalueList = new ArrayList<>(); //List<Map<String,String>> filePathSizevalueList = new ArrayList<>();
List<String> filePathlist=new ArrayList<>(); List<String> filePathlist=new ArrayList<>();
@ -111,12 +110,10 @@ public class NewsDownload {
else { else {
resultMap.put("ugc",0); resultMap.put("ugc",0);
} }
} }
public static void downloadAndSavevideo(Map<String, Object> resultMap,List<Map<String,String>> videoPathSizevalueList){
List<String> videoPath= (List<String>) resultMap.get("videoPath");
public static void downloadAndSavevideo(Map<String, Object> resultMap, List<Map<String, String>> videoPathSizevalueList, List<String> videoPath){
String putUrl = myGoFastAddr; String putUrl = myGoFastAddr;
// List<Map<String,String>> videoPathSizevalueList = new ArrayList<>(); // List<Map<String,String>> videoPathSizevalueList = new ArrayList<>();
String videoTime=resultMap.get("videoTime").toString(); String videoTime=resultMap.get("videoTime").toString();
@ -172,6 +169,7 @@ public class NewsDownload {
private static Map<String, Object> gofastswitch(Map<String, String> rerversemap , Map<String, Object> responseMap) {//原始的gofast 以及下载后的gofast地址 private static Map<String, Object> gofastswitch(Map<String, String> rerversemap , Map<String, Object> responseMap) {//原始的gofast 以及下载后的gofast地址
Integer pgc= (Integer) responseMap.get("pgc");//图片 Integer pgc= (Integer) responseMap.get("pgc");//图片
Integer egc= (Integer) responseMap.get("egc");//视频 Integer egc= (Integer) responseMap.get("egc");//视频

2
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/kafka/ReadKafka.java

@ -33,8 +33,6 @@ public class ReadKafka implements IKafka{
// } // }
// //
public void read(){ public void read(){
System.out.println(this.queue+"队列"+this.defaultReadTopicName+"topic"+this.threadNums+"group"+this.groupId+"sercer"+this.kafkaServerName);
//KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums,this.groupId,this.kafkaServerName);
KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums, this.groupId,this.kafkaServerName); KfkConsumer.startReadThread(this.queue, this.defaultReadTopicName,this.threadNums, this.groupId,this.kafkaServerName);
} }
@Override @Override

179
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/DataSaveManager.java

@ -4,8 +4,10 @@ import com.alibaba.fastjson.JSONArray;
import com.bfd.crawler.elasti.ElastiProducer; import com.bfd.crawler.elasti.ElastiProducer;
import com.bfd.crawler.kafka7.KfkProducer; import com.bfd.crawler.kafka7.KfkProducer;
import com.bfd.crawler.utils.JsonUtils; import com.bfd.crawler.utils.JsonUtils;
import com.bfd.mf.datasave.download.DownLoadFile; import com.bfd.mf.datasave.download.DownLoadFile;
import com.bfd.mf.datasave.download.NewsDownload; import com.bfd.mf.datasave.download.NewsDownload;
import com.bfd.mf.datasave.download.Newscontent;
import com.bfd.mf.datasave.tools.DataCheckUtil; import com.bfd.mf.datasave.tools.DataCheckUtil;
import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.datasave.tools.DateUtil;
import com.bfd.mf.datasave.tools.ReadLine; import com.bfd.mf.datasave.tools.ReadLine;
@ -33,6 +35,7 @@ import static com.bfd.crawler.utils.DataUtil.calcMD5;
public class DataSaveManager implements Runnable{ public class DataSaveManager implements Runnable{
private static Logger log = Logger.getLogger(DataSaveManager.class); private static Logger log = Logger.getLogger(DataSaveManager.class);
private String data ; private String data ;
private FieldNormaliz fieldNormaliz ; private FieldNormaliz fieldNormaliz ;
private static Map<String, List<Map<String,String>>> subject; private static Map<String, List<Map<String,String>>> subject;
@ -75,7 +78,13 @@ public class DataSaveManager implements Runnable{
} }
timetMap.put("dsbeginreadtime",System.currentTimeMillis()); timetMap.put("dsbeginreadtime",System.currentTimeMillis());
Map<String,String> tableInfo = tableInfoMap.get(bussinessType) ; Map<String,String> tableInfo = tableInfoMap.get(bussinessType) ;
String res = convertData(jsonData, tableInfo);
String res = null;
try {
res = convertData(jsonData, tableInfo);
res =JsonUtils.toJSONString(jsonData);
} catch (Exception e) {
e.printStackTrace();
}
Map<String, Object> resultMap = getResponse(res); // resultMap 就是将要写入到 ES kafka 的一条数据 Map<String, Object> resultMap = getResponse(res); // resultMap 就是将要写入到 ES kafka 的一条数据
resultMap.remove("processtime"); resultMap.remove("processtime");
//Map<String, Object> resultindexMap = new HashMap<String, Object>(resultMap); //Map<String, Object> resultindexMap = new HashMap<String, Object>(resultMap);
@ -126,15 +135,25 @@ public class DataSaveManager implements Runnable{
timetMap.put("dbeginsentes",System.currentTimeMillis()); timetMap.put("dbeginsentes",System.currentTimeMillis());
//处理新闻的主贴 //处理新闻的主贴
//对于非新闻的以及非上传的数据
//对于非新闻的以及非上传的数据!resultMap.toString().contains("keyword:肖战")&&resultMap.toString().contains("keyword:罗翔")&&
// if (!resultMap.toString().contains("keyword:罗翔")){
if(resultMap.containsKey("crawlDataFlag")&&!resultMap.containsKey("subjectId")) { if(resultMap.containsKey("crawlDataFlag")&&!resultMap.containsKey("subjectId")) {
//resultMap.containsKey("isDownload") //resultMap.containsKey("isDownload")
String key = getAllMapKey(resultMap); String key = getAllMapKey(resultMap);
List<String> filePath= (List<String>) resultMap.get("filePath");
List<String> imagePath= (List<String>) resultMap.get("imagePath");
List<String> videoPath= (List<String>) resultMap.get("videoPath");
List<String> filePath= (List<String>)JsonUtils.parseArray(resultMap.get("filePath").toString());
List<String> imagePath= (List<String>)JsonUtils.parseArray(resultMap.get("imagePath").toString());
List<String> videoPath= (List<String>) JsonUtils.parseArray(resultMap.get("videoPath").toString());
List<Map<String, String>> imagePathSize = (List<Map<String, String>>) JsonUtils.parseArray(resultMap.get("imagePathSize").toString());
List<Map<String, Object>> filePathSize = (List<Map<String, Object>>)JsonUtils.parseArray(resultMap.get("filePathSize").toString());
List<Map<String, String>> videoPathSize = (List<Map<String, String>>)JsonUtils.parseArray(resultMap.get("videoPathSize").toString());
String avatarPath=resultMap.get("avatarPath").toString(); String avatarPath=resultMap.get("avatarPath").toString();
// subject 中可以获取到这个key 对应的 专题信息 // subject 中可以获取到这个key 对应的 专题信息
System.out.println(key);
if(disposeCrawldataflag(key)) { if(disposeCrawldataflag(key)) {
String getsubjectList=RedisUtil.get(key,10); String getsubjectList=RedisUtil.get(key,10);
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList); List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList);
@ -143,7 +162,6 @@ public class DataSaveManager implements Runnable{
List<Map<String,String>> videoPathSizevalueList = new ArrayList<>(); List<Map<String,String>> videoPathSizevalueList = new ArrayList<>();
List<Map<String,String>> filePathSizevalueList = new ArrayList<>(); List<Map<String,String>> filePathSizevalueList = new ArrayList<>();
List<String> ocrText= (List<String>) resultMap.get("ocrText"); List<String> ocrText= (List<String>) resultMap.get("ocrText");
System.out.println(key+"=====");
long maxtime= Long.parseLong(subjectMap.get("maxtime")); long maxtime= Long.parseLong(subjectMap.get("maxtime"));
long mintme= Long.parseLong(subjectMap.get("mintime")); long mintme= Long.parseLong(subjectMap.get("mintime"));
long pubTimecomape= Long.parseLong(pubTime); long pubTimecomape= Long.parseLong(pubTime);
@ -157,7 +175,7 @@ public class DataSaveManager implements Runnable{
String asrText= (String) resultMap.get("asrText"); String asrText= (String) resultMap.get("asrText");
String hasTrans= resultMap.get("hasTrans").toString(); String hasTrans= resultMap.get("hasTrans").toString();
//String ocrText= (String) resultMap.get("ocrText"); //String ocrText= (String) resultMap.get("ocrText");
if((pubTimecomape-maxtime<=0&&pubTimecomape-mintme>=0)||"eccontent".equals(pageType)||"2".equals(primary)||"socailFollow".equals(pageType)){
if((pubTimecomape-maxtime<=0&&pubTimecomape-mintme>=0)||"eccontent".equals(pageType)||"2".equals(primary)||"socialFollow".equals(pageType)){
if("eccontent".equals(pageType)){ if("eccontent".equals(pageType)){
long pubtime=maxtime-1000*60*30; long pubtime=maxtime-1000*60*30;
//System.out.println(pubtime+"======="); //System.out.println(pubtime+"=======");
@ -347,21 +365,38 @@ public class DataSaveManager implements Runnable{
} }
timetMap.put("enddowloadtime",System.currentTimeMillis()); timetMap.put("enddowloadtime",System.currentTimeMillis());
}else{
//新闻主贴的处理逻辑newscontent,
// downloadPic,downloadFile,downloadVideo ,若有一个则需要进行isdown为true
// videoPath == egc
// filePath == ugc
// imagePath == pgc
if(crawl_content_key.contains("downloadPic")&&imagePath.size()>0){
NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue);
}else {
// //新闻主贴的处理逻辑newscontent,
// // downloadPic,downloadFile,downloadVideo ,若有一个则需要进行isdown为true
//// videoPath == egc
//// filePath == ugc
//// imagePath == pgc
// if(crawl_content_key.contains("downloadPic")&&imagePath.size()>0){
// NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue,);
// }
// if(crawl_content_key.contains("downloadFile")&&filePath.size()>0){
// NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList);
// }
// if(crawl_content_key.contains("downloadVideo")&&videoPath.size()>0){
// NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList);
// }
//System.out.print("==========");
if(crawl_content_key.contains("downloadPic")&&imagePathSize.size()==0){
NewsDownload.downloadAndSaveimage(resultMap,imagePathSizevalue,imagePath);
}else if (crawl_content_key.contains("downloadPic")&&imagePathSize.size()>0) {
Newscontent.downloadAndSaveimage(resultMap,imagePathSize);
} }
if(crawl_content_key.contains("downloadFile")&&filePath.size()>0){
NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList);
if(crawl_content_key.contains("downloadFile")&&filePathSize.size()==0){
NewsDownload.downloadAndSaveFile(resultMap,filePathSizevalueList,filePath);
}else if(crawl_content_key.contains("downloadFile")&&filePathSize.size()>0){
Newscontent.downloadAndSavefile(resultMap,filePathSize);
} }
if(crawl_content_key.contains("downloadVideo")&&videoPath.size()>0){
NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList);
if(crawl_content_key.contains("downloadVideo")&&videoPathSize.size()==0){
NewsDownload.downloadAndSavevideo(resultMap,videoPathSizevalueList,videoPath);
}else if(crawl_content_key.contains("downloadVideo")&&videoPathSize.size()>0){
Newscontent.downloadAndSavevideo(resultMap,videoPathSize);
} }
} }
if (filePathSizevalueList.size()==0&&imagePathSizevalue.size()==0&&videoPathSizevalueList.size()==0){ if (filePathSizevalueList.size()==0&&imagePathSizevalue.size()==0&&videoPathSizevalueList.size()==0){
resultMap.put("isDownload","false"); resultMap.put("isDownload","false");
@ -410,7 +445,7 @@ public class DataSaveManager implements Runnable{
if (url.contains("http")){ if (url.contains("http")){
revideoPathlist.add(url); revideoPathlist.add(url);
}else { }else {
url="http://172.18.1.113:8892"+url;
url="http://172.18.1.113:8080"+url;
revideoPathlist.add(url); revideoPathlist.add(url);
} }
} }
@ -419,9 +454,89 @@ public class DataSaveManager implements Runnable{
} }
} }
//创新院独有的
if("12331".equals(subject_id)){
List<String> revideoPath= (List<String>) resultMap.get("videoPath");
List<String> revideoPathlist=new ArrayList<>();
if (revideoPath.size()>0){
Iterator<String> it = revideoPath.iterator();
while(it.hasNext()) {
String url= it.next();
if (url.contains("http")){
revideoPathlist.add(url);
}else {
url="http://crawl-files.pontoaplus.com"+url;
revideoPathlist.add(url);
}
}
resultMap.put("videoPath",revideoPathlist);
//writerToKafka(5, "xhs1223", resultMap);
}
List<String> reimagePath= (List<String>) resultMap.get("imagePath");
List<String> reimagePathlist=new ArrayList<>();
if (reimagePath.size()>0){
Iterator<String> it = reimagePath.iterator();
while(it.hasNext()) {
String url= it.next();
if (url.contains("http")){
reimagePathlist.add(url);
}else {
url="http://crawl-files.pontoaplus.com"+url;
reimagePathlist.add(url);
}
}
resultMap.put("imagePath",reimagePathlist);
//writerToKafka(5, "xhs1223", resultMap);
}
List<String> refilePath= (List<String>) resultMap.get("filePath");
List<String> refilePathlist=new ArrayList<>();
if (refilePath.size()>0){
Iterator<String> it = refilePath.iterator();
while(it.hasNext()) {
String url= it.next();
if (url.contains("http")){
refilePathlist.add(url);
}else {
url="http://crawl-files.pontoaplus.com"+url;
refilePathlist.add(url);
}
}
resultMap.put("filePath",refilePathlist);
//writerToKafka(5, "xhs1223", resultMap);
}
if (!avatarPath.equals("")){
String resulturl= null;
try {
if (avatarPath.contains("http")){
resulturl=avatarPath;
}else {
resulturl="http://crawl-files.pontoaplus.com"+avatarPath;
}
//Map<String,Object> resultmap = DownLoadFile.downloadAndSaveFile(avatarPath, putUrl);
//resulturl = avatarPath.replace("172.18.1.113:8080","crawl-files.pontoaplus.com");;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
if(resulturl!= null && resulturl.length()!= 0){
resultMap.put("avatarPath", resulturl);
}
else{
resultMap.put("avatarPath", avatarPath);
}
}
writerToKafka(2, "dataFromES_10000", resultMap);
}
} catch (Throwable e) {
e.printStackTrace();
}
try { try {
writerToKafka(5, kafkaTopic, resultMap); writerToKafka(5, kafkaTopic, resultMap);
} catch (Exception e) { } catch (Exception e) {
@ -456,6 +571,7 @@ public class DataSaveManager implements Runnable{
else { else {
System.out.println(" 这条数据都没有标识位,就不往专题的索引存储了呗!!!!" + resultMap.get("dataId")); System.out.println(" 这条数据都没有标识位,就不往专题的索引存储了呗!!!!" + resultMap.get("dataId"));
} }
// }
timetMap.put("dendsentes",System.currentTimeMillis()); timetMap.put("dendsentes",System.currentTimeMillis());
resultMap.put("processtime",timetMap); resultMap.put("processtime",timetMap);
try { try {
@ -487,7 +603,7 @@ public class DataSaveManager implements Runnable{
responseMap.put("createTimeStr", DataCheckUtil.getCurrentTime(dateTime)); responseMap.put("createTimeStr", DataCheckUtil.getCurrentTime(dateTime));
System.out.println("==========================写入到【专题】ES :==========" + indexName + " - "+responseMap.get("docId") ); System.out.println("==========================写入到【专题】ES :==========" + indexName + " - "+responseMap.get("docId") );
if (null != docId && !("").equals(docId)) { if (null != docId && !("").equals(docId)) {
WriteMethod.writeMethod("20210621.txt",JsonUtils.toJSONString(responseMap));
// WriteMethod.writeMethod("20210621aaa.txt",JsonUtils.toJSONString(responseMap));
ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, subjectEsNum, indexName, indexType); ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, subjectEsNum, indexName, indexType);
elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap)); elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap));
} }
@ -500,7 +616,8 @@ public class DataSaveManager implements Runnable{
String docId=responseMap.get("docId").toString(); String docId=responseMap.get("docId").toString();
System.out.println("==========================写入到【日期】ES : ==========" + indexName + " - "+responseMap.get("docId")); System.out.println("==========================写入到【日期】ES : ==========" + indexName + " - "+responseMap.get("docId"));
if (null != docId && !("").equals(docId)) { if (null != docId && !("").equals(docId)) {
//WriteMethod.writeMethod("2021525like.txt",JsonUtils.toJSONString(responseMap));
log.info(JsonUtils.toJSONString(responseMap));
// WriteMethod.writeMethod("2021525likeaaa.txt",JsonUtils.toJSONString(responseMap));
ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, indexEsNum, indexName, indexType); ElastiProducer elastiProducer = ElastiProducer.getInstance(bussinessType, indexEsNum, indexName, indexType);
elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap)); elastiProducer.sendMessageToEs(JsonUtils.toJSONString(responseMap));
} }
@ -590,8 +707,6 @@ public class DataSaveManager implements Runnable{
List<String> imagePath= (List<String>) responseMap.get("imagePath"); List<String> imagePath= (List<String>) responseMap.get("imagePath");
List<String> videoPath= (List<String>) responseMap.get("videoPath"); List<String> videoPath= (List<String>) responseMap.get("videoPath");
String storyDetailPage= (String) responseMap.get("pageType"); String storyDetailPage= (String) responseMap.get("pageType");
// pageType
// storyDetailPage
Map<String,Object> resultmap=new HashMap<>(); Map<String,Object> resultmap=new HashMap<>();
if (pgc.equals(1)){ if (pgc.equals(1)){
try { try {
@ -623,7 +738,6 @@ public class DataSaveManager implements Runnable{
resultmap.put("srcimagePath",pictureList); resultmap.put("srcimagePath",pictureList);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
//log.error();
} }
} if(ugc.equals(1)){ } if(ugc.equals(1)){
if(responseMap.get("forwardUrl")!=""&&!"storyDetailPage".equals(storyDetailPage)&&!"socialComment".equals(storyDetailPage)){ if(responseMap.get("forwardUrl")!=""&&!"storyDetailPage".equals(storyDetailPage)&&!"socialComment".equals(storyDetailPage)){
@ -872,17 +986,6 @@ public class DataSaveManager implements Runnable{
String nomorprice = dataValue.toString().replaceAll("¥", "").replace("$","") ; String nomorprice = dataValue.toString().replaceAll("¥", "").replace("$","") ;
jsonData.put(key, nomorprice); jsonData.put(key, nomorprice);
} }
// if(key.equals("imagePath")&&dataValue != null){
// List<String> list=new ArrayList<>();
// list.add(dataValue.toString());
// jsonData.put(key,list);
// }
// if(key.equals("filePath") && dataValue != null){
// List<String> list=new ArrayList<>();
// list.add(dataValue.toString());
// jsonData.put(key,list);
// }
if(tableInfo.containsKey(key)){ if(tableInfo.containsKey(key)){
// System.out.print("tableInfo"+tableInfo); // System.out.print("tableInfo"+tableInfo);
String value = tableInfo.get(key); String value = tableInfo.get(key);
@ -929,11 +1032,11 @@ public class DataSaveManager implements Runnable{
} }
} catch (Exception e) { } catch (Exception e) {
//e.printStackTrace(); //e.printStackTrace();
String str=dataValue.toString().replace(", ",",");
String str=dataValue.toString().replace(", ",",").replace(",",",");
str = str.substring(1,str.length()-1).trim(); str = str.substring(1,str.length()-1).trim();
String []strs =str.split(","); String []strs =str.split(",");
// System.out.println(strs.length+"数组的长度啊"); // System.out.println(strs.length+"数组的长度啊");
System.out.println(str+"数组的长度啊");
//System.out.println(str+"数组的长度啊");
List<String> list = Arrays.asList(strs); List<String> list = Arrays.asList(strs);
jsonData.put(key, list) ; jsonData.put(key, list) ;
} }

9
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenKafkaManager.java

@ -1,5 +1,4 @@
package com.bfd.mf.datasave.listen; package com.bfd.mf.datasave.listen;
import com.bfd.crawler.kafka7.KfkConsumer;
import com.bfd.mf.datasave.kafka.ReadKafka; import com.bfd.mf.datasave.kafka.ReadKafka;
import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.datasave.tools.DateUtil;
import com.bfd.mf.entity.FieldNormaliz; import com.bfd.mf.entity.FieldNormaliz;
@ -16,6 +15,7 @@ import java.util.concurrent.TimeUnit;
public class ListenKafkaManager implements Runnable{ public class ListenKafkaManager implements Runnable{
private LinkedBlockingDeque<String> queue= new LinkedBlockingDeque<String>(5000); private LinkedBlockingDeque<String> queue= new LinkedBlockingDeque<String>(5000);
private LinkedBlockingDeque<String> fastqueue= new LinkedBlockingDeque<String>(5000);
private boolean isRun = true; private boolean isRun = true;
private FieldNormaliz fieldNormaliz; private FieldNormaliz fieldNormaliz;
private ThreadPoolExecutor spiderPoolExec ; private ThreadPoolExecutor spiderPoolExec ;
@ -24,15 +24,14 @@ public class ListenKafkaManager implements Runnable{
public ListenKafkaManager(FieldNormaliz fieldNormaliz){ public ListenKafkaManager(FieldNormaliz fieldNormaliz){
int croePoolsize = 20 ; int croePoolsize = 20 ;
int maximumPoolsize = 80;
int maximumPoolsize = 30;
long keepAliveTime = 0; long keepAliveTime = 0;
this.spiderPoolExec = new ThreadPoolExecutor(croePoolsize, maximumPoolsize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); this.spiderPoolExec = new ThreadPoolExecutor(croePoolsize, maximumPoolsize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
this.fieldNormaliz = fieldNormaliz ; this.fieldNormaliz = fieldNormaliz ;
this.subject = SubjectTask.subjectTaskMap; this.subject = SubjectTask.subjectTaskMap;
this.tableInfoMap = FiledTableInfo.tableInfoMap; this.tableInfoMap = FiledTableInfo.tableInfoMap;
String kafkaname = fieldNormaliz.getKafkaName() ; String kafkaname = fieldNormaliz.getKafkaName() ;
// KfkConsumer.startReadThread(queue,"Ejingdongdedup_filter1",10,"333",2);
ReadKafka readKafka = new ReadKafka(queue , kafkaname ,12 , fieldNormaliz.getGroupId(), fieldNormaliz.getKafkaSerName(),fieldNormaliz.getEsSerName());
ReadKafka readKafka = new ReadKafka(queue, kafkaname ,12 , fieldNormaliz.getGroupId(), fieldNormaliz.getKafkaSerName(),fieldNormaliz.getEsSerName());
readKafka.read(); readKafka.read();
} }
@ -53,8 +52,6 @@ public class ListenKafkaManager implements Runnable{
private void addTask(String data){ private void addTask(String data){
while ( spiderPoolExec.getPoolSize() >= spiderPoolExec.getMaximumPoolSize() || while ( spiderPoolExec.getPoolSize() >= spiderPoolExec.getMaximumPoolSize() ||
spiderPoolExec.getActiveCount() >= spiderPoolExec.getMaximumPoolSize()) { spiderPoolExec.getActiveCount() >= spiderPoolExec.getMaximumPoolSize()) {
//System.out.println("线程满了啊"+spiderPoolExec.getPoolSize()+"最大线程数"+spiderPoolExec.getMaximumPoolSize()+"现有的线程数"+spiderPoolExec.getActiveCount());
// System.out.println("线程满了啊");
try { try {
Thread.sleep(200); Thread.sleep(200);
} catch (InterruptedException e) { } catch (InterruptedException e) {

11
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/listen/ListenTaskManager.java

@ -22,7 +22,18 @@ public class ListenTaskManager {
ListenKafkaManager listenKafkaManager = new ListenKafkaManager(fieldNormaliz); ListenKafkaManager listenKafkaManager = new ListenKafkaManager(fieldNormaliz);
new Thread(listenKafkaManager).start(); new Thread(listenKafkaManager).start();
listenKafkaManagers.put(kafkaServerName+"#"+kafkaTopicName, listenKafkaManager); listenKafkaManagers.put(kafkaServerName+"#"+kafkaTopicName, listenKafkaManager);
//快速读取数据的topic
ListenfastTaskManager ListenfastTaskManager = new ListenfastTaskManager(fieldNormaliz);
new Thread(ListenfastTaskManager).start();
} }
} }

13
cl_stream_datasave/src/main/java/com/bfd/mf/datasave/tools/DataProcess.java

@ -1,8 +1,10 @@
package com.bfd.mf.datasave.tools; package com.bfd.mf.datasave.tools;
import crawler.open.util.RedisUtil; import crawler.open.util.RedisUtil;
import org.apache.log4j.Logger;
public class DataProcess implements Runnable { public class DataProcess implements Runnable {
private static Logger log = Logger.getLogger(DataProcess.class);
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
@ -11,11 +13,12 @@ public class DataProcess implements Runnable {
String a = Constants.getLineQueue().take(); String a = Constants.getLineQueue().take();
String key=a.split("@#@")[0]; String key=a.split("@#@")[0];
String value=a.split("@#@")[1]; String value=a.split("@#@")[1];
RedisUtil.set(key, value, 10);
// if(Constants.getLineQueue().size() == 1000){
// Constants.getLineQueue().clear();
// }
System.out.println(Constants.getLineQueue().size()+"队列的大小");
RedisUtil.set(key, value, 9);
System.out.println(Constants.getLineQueue().size()+"队列的大小"+key);
log.info(Constants.getLineQueue().size()+"队列的大小"+key);
// WriteMethod.writeMethod("1.txt", a);
log.info("插入redis的key" + key + " ; data = " +Constants.getLineQueue().size()+"队列的大小");
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }

3
cl_stream_datasave/src/main/java/com/bfd/mf/entity/AllKeys.java

@ -170,7 +170,8 @@ public class AllKeys {
map.put("poorrate",0); map.put("poorrate",0);
map.put("processtime",new HashMap<>()); map.put("processtime",new HashMap<>());
map.put("tag",""); map.put("tag","");
map.put("mentionAccount",new ArrayList<>());
map.put("mentionAccountUrl",new ArrayList<>());
} }

267
cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/SubjectTask.java

@ -3,16 +3,14 @@ package com.bfd.mf.entity.mysql;
//import com.bfd.crawler.utils.JsonUtils; //import com.bfd.crawler.utils.JsonUtils;
import com.bfd.crawler.utils.JsonUtils; import com.bfd.crawler.utils.JsonUtils;
import com.bfd.mf.datasave.listen.DataSaveManager;
import com.bfd.mf.datasave.tools.Constants; import com.bfd.mf.datasave.tools.Constants;
import com.bfd.mf.datasave.tools.DBUtil; import com.bfd.mf.datasave.tools.DBUtil;
import com.bfd.mf.datasave.tools.DateUtil; import com.bfd.mf.datasave.tools.DateUtil;
import com.bfd.mf.datasave.tools.WriteMethod;
import crawler.open.util.RedisUtil; import crawler.open.util.RedisUtil;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import static com.bfd.mf.entity.mysql.Tasklimit.subjectTasktimelimiit; import static com.bfd.mf.entity.mysql.Tasklimit.subjectTasktimelimiit;
import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit; import static com.bfd.mf.entity.mysql.Userlimit.subjectuserlimiit;
@ -186,20 +184,33 @@ public class SubjectTask implements Runnable {
@Override @Override
public void run() { public void run() {
while (true){
subjectTaskMap.clear();
while (true) try {
{
// subjectTaskMap.clear();
Userlimit.loaduser(); Userlimit.loaduser();
Tasklimit.loadTask(); Tasklimit.loadTask();
long updatetime = new Date().getTime()/1000-30000;
// List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat").query("select cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id)where (ct.crawl_status=1 or ct.crawl_status=3) and ct.del=0 ;");ct.app_id=cs.app_id and
//String time=DateUtil.getDate();
//System.out.println(time);
System.out.println("结束时间"+ updatetime);
List<Map<String, Object>> subjectTaskList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.app_id=cs.app_id and ct.cid!=\"\" and unix_timestamp(ct.update_time)>'"+updatetime+"' and ct.crawl_data_flag like '%气象侦察机%'order by ct.update_time desc;");
System.out.println(subjectTaskList.size());
if(subjectTaskList.size() > 0){
long updatetime = new Date().getTime()/1000-15;
SimpleDateFormat datetime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String now=datetime.format(Calendar.getInstance().getTime());
System.out.println(updatetime+now);
long starttime=new Date().getTime()/1000-20L;
if (disposeCrawldataflag("Tasktimelimiit")){
starttime= Long.parseLong(RedisUtil.get("Tasktimelimiit",9));
starttime=starttime-5;
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9);
}else {
System.out.println("第一次写入");
RedisUtil.set("Tasktimelimiit", String.valueOf(updatetime), 9);
}
System.out.println(System.currentTimeMillis()+"开始mysql执行的时间");
List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.crawl_start_time,ct.crawl_end_time,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and ct.app_id!='bw01' and unix_timestamp(ct.start_time)>'"+starttime+"';");
// List<Map<String, Object>> subjectTaskendList = DBUtil.getInstance("db_stat_alltask").query("select ct.crawl_content_key,ct.create_user_id,ct.app_id,cs.del,ct.external_id, ct.subject_id, ct.id, ct.cid, ct.crawl_data_flag,cs.kafka_switch,cs.kafka_addr,cs.go_fast_addr,cs.kafka_topic,cs.go_fast_switch from cl_subject cs Join cl_task ct on(ct.subject_id=cs.id) where (ct.crawl_status=1 ) and ct.del=0 and ct.cid!=\"\" and crawl_data_flag like '%http://xhslink.com/RLmh8p%';");
System.out.println(System.currentTimeMillis()+"执行mysql任务结束的时间");
if(subjectTaskendList.size()>0){
System.out.println(subjectTaskendList.size()+"任务的大小");
for(Map<String, Object> subjectTask : subjectTaskendList){
String key = ""; String key = "";
for(Map<String, Object> subjectTask : subjectTaskList){ //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa}
String keytwo = ""; String keytwo = "";
if( subjectTask.get("cid").equals("Tmall")){ if( subjectTask.get("cid").equals("Tmall")){
key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag"); key = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag");
@ -228,6 +239,7 @@ public class SubjectTask implements Runnable {
String v_ocr="0"; String v_ocr="0";
String v_trans="0"; String v_trans="0";
String v_crawl_content_key=""; String v_crawl_content_key="";
String appid="";
if(null != subjectTask.get("subject_id")) { if(null != subjectTask.get("subject_id")) {
v_subject_id = subjectTask.get("subject_id").toString(); v_subject_id = subjectTask.get("subject_id").toString();
} }
@ -255,15 +267,17 @@ public class SubjectTask implements Runnable {
if(null !=subjectTask.get("kafka_topic")){ if(null !=subjectTask.get("kafka_topic")){
v_kafka_topic = subjectTask.get("kafka_topic").toString(); v_kafka_topic = subjectTask.get("kafka_topic").toString();
} }
// if(null !=subjectTask.get("status")){
// v_status = subjectTask.get("status").toString();
// }
if(null !=subjectTask.get("del")){ if(null !=subjectTask.get("del")){
v_del = subjectTask.get("del").toString(); v_del = subjectTask.get("del").toString();
} }
if(null !=subjectTask.get("create_user_id")){ if(null !=subjectTask.get("create_user_id")){
v_create_user_id = subjectTask.get("create_user_id").toString(); v_create_user_id = subjectTask.get("create_user_id").toString();
} }
if(null !=subjectTask.get("app_id")){
appid = subjectTask.get("app_id").toString();
}
value.put("appid", appid);
value.put("subject_id",v_subject_id); value.put("subject_id",v_subject_id);
value.put("go_fast_addr",v_go_fast_addr); value.put("go_fast_addr",v_go_fast_addr);
value.put("export_to_kafka",v_kafka_switch); value.put("export_to_kafka",v_kafka_switch);
@ -272,34 +286,39 @@ public class SubjectTask implements Runnable {
value.put("external_id",v_external_id); value.put("external_id",v_external_id);
value.put("go_fast_switch",v_go_fast_switch); value.put("go_fast_switch",v_go_fast_switch);
value.put("kafka_topic",v_kafka_topic); value.put("kafka_topic",v_kafka_topic);
// value.put("status",v_status);//专题的状态
value.put("del",v_del);//专题的状态 value.put("del",v_del);//专题的状态
value.put("appid",subjectTask.get("app_id").toString());
value.put("crawl_content_key",v_crawl_content_key); value.put("crawl_content_key",v_crawl_content_key);
//System.out.print(v_external_id+"external_id");
value.put("maxtime",subjectTask.get("crawl_end_time").toString());
value.put("mintime",subjectTask.get("crawl_start_time").toString());
String newkey = key.toLowerCase(); String newkey = key.toLowerCase();
String userkey=newkey+"#####"+subjectTask.get("app_id").toString().toLowerCase();
String userkey=newkey;
//组装时间的参数 //组装时间的参数
if (subjectTasktimelimiit.containsKey(userkey)){
List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey);
if(timelist.size()==1){
for(Map<String, String> subjectTasktime : timelist){
value.put("maxtime",subjectTasktime.get("max_time").toString());
value.put("mintime",subjectTasktime.get("min_time").toString());
}
} else{
for(Map<String, String> subjectTasktime : timelist){
String subject_id=subjectTasktime.get("subject_id").toString();
if (v_subject_id.equals(subject_id)){
value.put("maxtime",subjectTasktime.get("max_time").toString());
value.put("mintime",subjectTasktime.get("min_time").toString());
}
}
}
try {
// if (!subjectTasktimelimiit.containsKey(userkey)) { //如果无最大和最小时间
// Tasklimit.loadTask();
// }
// List<Map<String,String>>timelist=subjectTasktimelimiit.get(userkey);
// if(timelist.size()==1){
// for(Map<String, String> subjectTasktime : timelist){
// value.put("maxtime",subjectTasktime.get("max_time").toString());
// value.put("mintime",subjectTasktime.get("min_time").toString());
// }
// } else{
// for(Map<String, String> subjectTasktime : timelist){
// String subject_id=subjectTasktime.get("subject_id").toString();
// if (v_subject_id.equals(subject_id)){
// value.put("maxtime",subjectTasktime.get("max_time").toString());
// value.put("mintime",subjectTasktime.get("min_time").toString());
// }
// }
// }
} catch (Exception e) {
System.out.print("获取时间失败了"+subjectTask.get("crawl_data_flag"));
value.put("maxtime", "1735660800000");
value.put("mintime","0");
e.printStackTrace();
} }
// //用户的权限
// //用户的权限
if (subjectuserlimiit.containsKey(v_create_user_id)){ if (subjectuserlimiit.containsKey(v_create_user_id)){
Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id); Map<String,Object> permission= (Map<String, Object>) subjectuserlimiit.get(v_create_user_id);
v_ocr= permission.get("is_ocr").toString(); v_ocr= permission.get("is_ocr").toString();
@ -307,50 +326,166 @@ public class SubjectTask implements Runnable {
} }
value.put("is_ocr",v_ocr); value.put("is_ocr",v_ocr);
value.put("is_trans",v_trans); value.put("is_trans",v_trans);
//组装相同任务的任务id
if(subjectTaskMap.containsKey(newkey)){
valueList = subjectTaskMap.get(newkey);
for (Map<String, String> valuetask : valueList){
String task=valuetask.get("task_id")+","+v_task_id;
valuetask.put("task_id",task);
value.put("task_id",task);
}
valueList.add(value);
}else{
value.put("task_id",v_task_id); value.put("task_id",v_task_id);
valueList.add(value); valueList.add(value);
Set keysSet = new HashSet();
// if(subjectTaskMap.containsKey(newkey)){
// valueList = subjectTaskMap.get(newkey);
// for (Map<String, String> valuetask : valueList){
// String task=valuetask.get("task_id")+","+v_task_id;
// valuetask.put("task_id",task);
// value.put("task_id",task);
// }
// valueList.add(value);
// }else{
// value.put("task_id",v_task_id);
// valueList.add(value);
// }
if(disposeCrawldataflag(newkey)) { //判断redis中的key
String getsubjectList=RedisUtil.get(newkey,9);
try {
List<Map<String, String>> subjectList = (List<Map<String, String>>) JsonUtils.parseArray(getsubjectList);
List<Map<String, String>> redistList=new ArrayList<>();
for(Map<String, String> redissubjectTask : subjectList){
String resubject_id=redissubjectTask.get("subject_id").toString();
String reappid=redissubjectTask.get("appid").toString();
String taskid=redissubjectTask.get("task_id").toString();
String maxtime="";
if(redissubjectTask.containsKey("maxtime")){
maxtime=redissubjectTask.get("maxtime").toString();
}
if(!resubject_id.equals(v_subject_id)||!reappid.equals(appid)||!v_task_id.equals(taskid)){
redistList.add(redissubjectTask);;
} }
if(keytwo.length()>0){
String tmallnewkey = keytwo.toLowerCase();
subjectTaskMap.put(tmallnewkey,valueList);
} }
valueList.addAll(redistList);
String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList); String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList);
try { try {
System.out.println("写入redis是"+redis);
Constants.getLineQueue().put(redis); Constants.getLineQueue().put(redis);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
//天猫
if(keytwo.length()>0){
String tmallnewkey = keytwo.toLowerCase();
String redis2=tmallnewkey+"@#@"+JsonUtils.toJSONString(valueList);
try {
Constants.getLineQueue().put(redis2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// RedisUtil.set(newkey, JsonUtils.toJSONString(valueList), 10);
// System.out.println("结束时间"+ DateUtil.getcurr());
subjectTaskMap.put(newkey,valueList);
//System.out.println(newkey);
}
System.out.println("结束时间"+ DateUtil.getcurr());
//System.out.println(subjectTaskMap.size());
log.info("当天任务的数量" + key + " ; data = " + subjectTaskMap.size());
//SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
//System.out.println(subjectTaskList.size());// new Date()为获取当前系统时间
//WriteMethod.writeMethod("0621test.txt",JsonUtils.toJSONString(subjectTaskMap));
//System.out.println(JsonUtils.toJSONString(subjectTaskMap)+"当前时间"+ DateUtil.getcurr());
} catch (Exception e) {
log.error("获取redis中的任务失败"+getsubjectList);
}
}else {
String redis=newkey+"@#@"+JsonUtils.toJSONString(valueList);
System.out.println(redis+"不存在");
try {
Constants.getLineQueue().put(redis);
} catch (InterruptedException e) {
e.printStackTrace();
} }
if(keytwo.length()>0){
String tmallnewkey = keytwo.toLowerCase();
String redis2=tmallnewkey+"@#@"+JsonUtils.toJSONString(valueList);
try { try {
Thread.sleep(3000);
Constants.getLineQueue().put(redis2);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
}
}
// subjectTaskMap.put(newkey,valueList);
// //System.out.print(valueList);
//
// //System.out.print(JsonUtils.toJSONString(subjectTaskMap));
//
// Set keysSets = new HashSet();
// for (String newkeya :subjectTaskMap.keySet()){
// List<Map<String, String>> newkeyvalue=subjectTaskMap.get(newkeya);
// List<Map<String, String>> setkeyvalue=new ArrayList<>();
// for(Map<String, String> redissubjectTask : newkeyvalue){
// String resubject_id=redissubjectTask.get("subject_id").toString();
// String reappid=redissubjectTask.get("appid").toString();
// String taskid=redissubjectTask.get("task_id").toString();
// String keys=resubject_id+reappid+newkeya+taskid;
// RedisUtil.set(keys, "1", 10);
//// int beforeSize = keysSets.size();
//// keysSets.add(keys);
//// int afterSize = keysSets.size();
//// if(afterSize == beforeSize + 1){
//// setkeyvalue.add(redissubjectTask);
//// }
// setkeyvalue.add(redissubjectTask);
// }
// String redis=newkeya+"@#@"+JsonUtils.toJSONString(setkeyvalue);
// System.out.println(redis);
// try {
// Constants.getLineQueue().put(redis);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
System.out.println(System.currentTimeMillis()+"任务组装结束的时间");
log.info("结束时间"+ DateUtil.getcurr());
log.info("当天任务的数量" + key + " ; data = " + subjectTaskendList.size());
// }else {
// String delkey = subjectendtTask.get("cid") + "#####" + subjectendtTask.get("crawl_data_flag");
// log.info(delkey.toLowerCase()+"====del");
// RedisUtil.del(delkey.toLowerCase(), 10);
// }
}
}else {
Thread.sleep(1000*5);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean disposeCrawldataflag(String crawldataflag) {
try{
if (RedisUtil.exists(crawldataflag, 9)) { // 先去 redis中查询是否存在不存直接忽略
String value = RedisUtil.get(crawldataflag,9);
if(null != value && !("").equals(value)) {
return true;
}
} else {
log.error("[datasave] exec >>> 灌数:该 crwaldataflag 在 Redis 中不存在!!! keys = " + crawldataflag + " ; dbindex = " + 10);
return false;
}
return false;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
public static void main(String[] args) {
String resubject_id="1";
String v_subject_id="1";
String reappid="12";
String appid="12";
String v_task_id="12";
String taskid="13";
if(!resubject_id.equals(v_subject_id)||!reappid.equals(appid)||!v_task_id.equals(taskid)){
System.out.println("11");
} }
} }
} }

71
cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Tasklimit.java

@ -2,6 +2,7 @@ package com.bfd.mf.entity.mysql;
import com.bfd.crawler.utils.JsonUtils; import com.bfd.crawler.utils.JsonUtils;
import com.bfd.mf.datasave.tools.DBUtil; import com.bfd.mf.datasave.tools.DBUtil;
import com.bfd.mf.datasave.tools.WriteMethod;
import javax.xml.bind.util.JAXBSource; import javax.xml.bind.util.JAXBSource;
import java.util.ArrayList; import java.util.ArrayList;
@ -15,8 +16,10 @@ public class Tasklimit {
public static Map<String, List<Map<String,String>>>subjectTasktimelimiit = new HashMap<>(); public static Map<String, List<Map<String,String>>>subjectTasktimelimiit = new HashMap<>();
public static void loadTask(){ public static void loadTask(){
subjectTasktimelimiit.clear(); subjectTasktimelimiit.clear();
List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;");
System.out.println(Tasktimelimiit.size()+"Tasktimelimiit");
List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where cid!='MUZX2WNM84L3ARXNSYN3'and del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;");
// List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;");
System.out.println(Tasktimelimiit.size()+"时间排序Tasktimelimiit");
if (Tasktimelimiit.size()>0){ if (Tasktimelimiit.size()>0){
String newkey = ""; String newkey = "";
for(Map<String, Object> subjectTask : Tasktimelimiit) { //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa} for(Map<String, Object> subjectTask : Tasktimelimiit) { //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa}
@ -36,11 +39,16 @@ public class Tasklimit {
String max_time = ""; String max_time = "";
String min_time = ""; String min_time = "";
String subject_id=""; String subject_id="";
newkey=newkey+"#####" +subjectTask.get("app_id");
//newkey=newkey+"#####" +subjectTask.get("app_id");
newkey= newkey.toLowerCase(); newkey= newkey.toLowerCase();
subject_id=subjectTask.get("subject_id").toString(); subject_id=subjectTask.get("subject_id").toString();
try {
max_time=subjectTask.get("crawl_end_time").toString(); max_time=subjectTask.get("crawl_end_time").toString();
} catch (Exception e) {
System.out.println(subjectTask);
e.printStackTrace();
}
value.put("max_time",max_time); value.put("max_time",max_time);
min_time=subjectTask.get("crawl_start_time").toString(); min_time=subjectTask.get("crawl_start_time").toString();
value.put("min_time",min_time); value.put("min_time",min_time);
@ -58,8 +66,63 @@ public class Tasklimit {
} }
subjectTasktimelimiit.put(newkey,valueList); subjectTasktimelimiit.put(newkey,valueList);
} }
// System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit));
//System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit));
}
} }
public static void loaderrTask(String cid,String crawl_data_flag){
subjectTasktimelimiit.clear();
List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!='"+cid+"' and crawl_data_flag='"+crawl_data_flag+"';");
// List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT MIN(crawl_start_time) crawl_start_time ,MAX(crawl_end_time) crawl_end_time ,crawl_data_flag ,subject_id ,cid ,app_id from cl_task where del=0 and (crawl_status=1) and cid!=\"\" GROUP BY crawl_data_flag,cid,subject_id,app_id;");
System.out.println(Tasktimelimiit.size()+"时间排序Tasktimelimiit");
if (Tasktimelimiit.size()>0){
String newkey = "";
for(Map<String, Object> subjectTask : Tasktimelimiit) { //{subject_id=10222, name=我是张三, task_id=188, id=71, crawl_data_flag=aaa}
String keytwo = "";
Map<String,String> value = new HashMap<>();
List<Map<String,String>> valueList = new ArrayList<>();
if (subjectTask.get("cid").equals("Tmall")) {
newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag");
keytwo = "Taobao" + "#####" + subjectTask.get("crawl_data_flag");
} else if (subjectTask.get("cid").equals("Taobao")) {
newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag");
keytwo = "Tmall" + "#####" + subjectTask.get("crawl_data_flag");
} else {
newkey = subjectTask.get("cid") + "#####" + subjectTask.get("crawl_data_flag");
} }
String max_time = "";
String min_time = "";
String subject_id="";
//newkey=newkey+"#####" +subjectTask.get("app_id");
newkey= newkey.toLowerCase();
subject_id=subjectTask.get("subject_id").toString();
max_time=subjectTask.get("crawl_end_time").toString();
value.put("max_time",max_time);
min_time=subjectTask.get("crawl_start_time").toString();
value.put("min_time",min_time);
value.put("subject_id",subject_id);
if(subjectTasktimelimiit.containsKey(newkey)){
valueList = subjectTasktimelimiit.get(newkey);
valueList.add(value);
}else{
valueList.add(value);
}
if(keytwo.length()>0){
String tmallnewkey = keytwo.toLowerCase();
subjectTasktimelimiit.put(tmallnewkey,valueList);
}
subjectTasktimelimiit.put(newkey,valueList);
}
//System.out.println(JsonUtils.toJSONString(subjectTasktimelimiit));
}
}
} }

10
cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/Userlimit.java

@ -15,16 +15,16 @@ public class Userlimit {
public static Map<String, Object>subjectuserlimiit = new HashMap<>(); public static Map<String, Object>subjectuserlimiit = new HashMap<>();
public static void loaduser() { public static void loaduser() {
subjectuserlimiit.clear(); subjectuserlimiit.clear();
List<Map<String, Object>> userlimiit = DBUtil.getInstance("db_stat").query("SELECT user_id,is_ocr,is_asr,is_trans FROM `cl_user_config`");
List<Map<String, Object>> userlimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT user_id,permission FROM `cl_user_config`");
if (userlimiit.size() > 0) { if (userlimiit.size() > 0) {
for (Map<String, Object> subjectuser : userlimiit) { for (Map<String, Object> subjectuser : userlimiit) {
int is_ocr=0; int is_trans=0; int is_ocr=0; int is_trans=0;
String userid=(String) subjectuser.get("user_id"); String userid=(String) subjectuser.get("user_id");
if (subjectuser.containsKey("is_ocr")&&null!=subjectuser.get("is_ocr")) {
is_ocr=(int) subjectuser.get("is_ocr");
if (subjectuser.containsKey("permission")&&subjectuser.get("permission").toString().contains("ocr")) {
is_ocr=1;
} }
if (subjectuser.containsKey("is_trans")&&null!=subjectuser.get("is_trans")) {
is_trans =(int) subjectuser.get("is_trans");
if (subjectuser.containsKey("permission")&&subjectuser.get("permission").toString().contains("translation")) {
is_trans =1;
} }
Map<String,Object> value = new HashMap<>(); Map<String,Object> value = new HashMap<>();
value.put("is_ocr",is_ocr); value.put("is_ocr",is_ocr);

5
cl_stream_datasave/src/main/java/com/bfd/mf/entity/mysql/cl_task.java

@ -14,7 +14,10 @@ public class cl_task {
public static List<String> subtaskstatuslimit = new ArrayList<>(); public static List<String> subtaskstatuslimit = new ArrayList<>();
public static List<String> subtaskstatuslimit3 = new ArrayList<>(); public static List<String> subtaskstatuslimit3 = new ArrayList<>();
public static void loadTask(){ public static void loadTask(){
List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_status=3 and update_time like '%2021-07-14%' GROUP BY crawl_data_flag,cid;");
// List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_status=3 and update_time like '%2021-07-14%' GROUP BY crawl_data_flag,cid;");
List<Map<String, Object>> Tasktimelimiit = DBUtil.getInstance("db_stat_alltask").query("SELECT crawl_data_flag,cid FROM `cl_task` WHERE crawl_data_flag like '%keyword:肖战%' GROUP BY crawl_data_flag,cid;");
if (Tasktimelimiit.size()>0){ if (Tasktimelimiit.size()>0){
String newkey = ""; String newkey = "";

54
cl_stream_datasave/src/main/java/com/bfd/mf/runstart/RunStartDataSave.java

@ -1,9 +1,6 @@
package com.bfd.mf.runstart; package com.bfd.mf.runstart;
import com.bfd.crawler.kafka7.KfkConsumer;
import com.bfd.crawler.kafka7.consts.KafkaConsts;
import com.bfd.mf.datasave.tools.DataProcess; import com.bfd.mf.datasave.tools.DataProcess;
import com.bfd.mf.datasave.tools.DateUtil;
import com.bfd.mf.entity.DataSaveManager; import com.bfd.mf.entity.DataSaveManager;
import com.bfd.mf.entity.impl.DataSaveManagerImpl; import com.bfd.mf.entity.impl.DataSaveManagerImpl;
import com.bfd.mf.datasave.tools.DBUtil; import com.bfd.mf.datasave.tools.DBUtil;
@ -22,7 +19,7 @@ public class RunStartDataSave {
private static final Logger LOG = Logger.getLogger(RunStartDataSave.class); private static final Logger LOG = Logger.getLogger(RunStartDataSave.class);
private static String log4jPath = "../etc/log4j.properties"; private static String log4jPath = "../etc/log4j.properties";
private static String dbPath = "../etc/db.properties";
private static String dbPath = "../etc/134db.properties";
private static String redisPath = "../etc/145redis.properties"; private static String redisPath = "../etc/145redis.properties";
static { static {
PropertyConfigurator.configureAndWatch(log4jPath); PropertyConfigurator.configureAndWatch(log4jPath);
@ -30,9 +27,7 @@ public class RunStartDataSave {
RedisUtil.init(redisPath); RedisUtil.init(redisPath);
} }
public static void main(String[] args) { public static void main(String[] args) {
cl_task.loadTask();
//cl_task.loadTask();
// try { // try {
// FiledTableInfo.loadTableInfo(); // FiledTableInfo.loadTableInfo();
// startRmiService(); // startRmiService();
@ -43,7 +38,7 @@ public class RunStartDataSave {
// try { // try {
// //Userlimit.loaduser(); // //Userlimit.loaduser();
// //Tasklimit.loadTask(); // //Tasklimit.loadTask();
// SubjectTask.loadSubjectTask();
// // SubjectTask.loadSubjectTask();
// } catch (Exception e) { // } catch (Exception e) {
// e.printStackTrace(); // e.printStackTrace();
// } // }
@ -58,25 +53,36 @@ public class RunStartDataSave {
for (int i = 0; i < 1; i ++) {
SubjectTask subjectTask = new SubjectTask();
Thread SubjectTaskThread = new Thread(subjectTask, "SubjectTask" + i);
SubjectTaskThread.start();
}
//
// for (int i = 0; i < 1; i ++) { // for (int i = 0; i < 1; i ++) {
// SubjectTask SubjectTask = new SubjectTask();
// Thread SubjectTaskThread = new Thread(SubjectTask, "dataDedupProcess" + i);
// SubjectTaskdele subjectTaskdel = new SubjectTaskdele();
// Thread SubjectTaskThread = new Thread(subjectTaskdel, "SubjectTask" + i);
// SubjectTaskThread.start(); // SubjectTaskThread.start();
// } // }
//// try {
//// Thread.sleep(6000);
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
// //多线程写redis
// for (int i = 0; i < 100; i ++) {
// DataProcess dataProcess = new DataProcess();
// Thread dataProcessThread = new Thread(dataProcess, "dataDedupProcess" + i);
// dataProcessThread.start();
//
//多线程写redis
for (int i = 0; i < 10; i ++) {
DataProcess dataProcess = new DataProcess();
Thread dataProcessThread = new Thread(dataProcess, "dataDedupProcess" + i);
dataProcessThread.start();
}
// for (int i = 0; i < 1; i ++) {
// si siupdate = new si();
// Thread update = new Thread(siupdate, "SubjectTask" + i);
// update.start();
// } // }
// Timer timer2 = new Timer();
// timer.schedule(new UpdateTask(), new Date(), 4*1000);
@ -93,11 +99,11 @@ public class RunStartDataSave {
* 本地主机上的远程对象注册表Registry的实例 并指定端口为8888这一步必不可少Java默认端口是1099 * 本地主机上的远程对象注册表Registry的实例 并指定端口为8888这一步必不可少Java默认端口是1099
* 必不可缺的一步缺少注册表创建则无法绑定对象到远程注册表上 * 必不可缺的一步缺少注册表创建则无法绑定对象到远程注册表上
***/ ***/
LocateRegistry.createRegistry(2099);//3888
LocateRegistry.createRegistry(3099);//6888
/*** 把远程对象注册到RMI注册服务器上,并命名为taskManager ***/ /*** 把远程对象注册到RMI注册服务器上,并命名为taskManager ***/
/*** 绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的) ***/ /*** 绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的) ***/
Naming.bind("//127.0.0.1:2099/dataSaveManager", dataSaveManager);
Naming.bind("//127.0.0.1:3099/dataSaveManager", dataSaveManager);
System.out.println(">>>>>INFO:远程IHello对象绑定成功!"); System.out.println(">>>>>INFO:远程IHello对象绑定成功!");
} catch (RemoteException e) { } catch (RemoteException e) {
System.out.println("创建远程对象发生异常!"); System.out.println("创建远程对象发生异常!");

Loading…
Cancel
Save