@ -12,6 +12,7 @@ import com.bfd.mf.job.domain.entity.Task;
import com.bfd.mf.job.domain.repository.SubjectRepository ;
import com.bfd.mf.job.domain.repository.TaskRepository ;
import com.bfd.mf.job.download.DownLoadFile ;
import com.bfd.mf.job.service.WriterTXTService ;
import com.bfd.mf.job.util.* ;
import com.google.common.collect.Maps ;
import com.google.common.util.concurrent.RateLimiter ;
@ -144,6 +145,8 @@ public class QueryService {
List < String > docIdsList = new ArrayList < > ( ) ;
try {
/ / 创建过滤条件 & 任务预处理
/ / fromMills = 1612108800000L ;
/ / toMills = 1613750400000L ;
fromMills = task . getCrawlStartTime ( ) . longValue ( ) ;
toMills = task . getCrawlEndTime ( ) . longValue ( ) ;
Long year = config . getQueryDataYearStarttime ( ) ; / / 获取配置文件中用直接拉年份的时间节点 , 现在设置的是2019年 , 2019年前的全部用年做索引 , 不拆成天
@ -220,12 +223,23 @@ public class QueryService {
/ / 现在判断视频 、 图片 、 文件是否下载的方式只取决于isDownload 字段
boolean isDownload = data . getBoolean ( ESConstants . ISDOWNLOAD ) ;
if ( isDownload ) {
/ / String goFastAddr = defultAddr ;
data = downloadAndChangePath ( data ) ;
}
if ( ! data . get ( "_id_" ) . equals ( "" ) ) {
/ / 写入到专题ES中
saveService . saveToEsWithFilter ( config . esMiniClusterName ( ) , finalIndexName1 , data ) ;
kafkaProducer . send ( config . getSendTopic ( ) , JSONObject . toJSONString ( data ) ) ;
/ / 覆盖到日期ES中
String pubTimeStr = data . getString ( "pubTimeStr" ) . split ( " " ) [ 0 ] ;
String dateIndex = "cl_index_" + pubTimeStr ;
System . out . println ( config . esNormalClusterName ( ) + " ; index : " + dateIndex ) ;
/ / saveService . saveToEsWithFilter ( config . esNormalClusterName ( ) , dateIndex , data ) ;
/ / kafkaProducer . send ( config . getSendTopic ( ) , JSONObject . toJSONString ( data ) ) ;
String docId = data . getString ( "docId" ) ;
String id = data . getString ( "_id_" ) ;
String result = pubTimeStr + "\t" + docId + "\t" + id ;
WriteMethod . writeMethod ( "../data.txt" , JSONObject . toJSONString ( data ) ) ;
WriteMethod . writeMethod ( "../error_1123.txt" , result ) ;
/ / long crawlTime = data . getLong ( "crawlTime" ) ;
/ / if ( crawlTime < 1633795200000L ) {
/ / WriteMethod . writeMethod ( "../../../error.txt" , JSONObject . toJSONString ( data ) ) ;
@ -249,7 +263,7 @@ public class QueryService {
if ( docIdsList . size ( ) > 0 ) {
String docType = docIdsList . get ( 0 ) . split ( "_" ) [ 1 ] ;
String docIds [ ] = docIdsList . toArray ( new String [ 0 ] ) ;
/ / queryComments ( docIds , docType , finalFromMills , finalToMills , finalTaskId , crawlDataFlag , indexName ) ;
queryComments ( docIds , docType , finalFromMills , finalToMills , finalTaskId , crawlDataFlag , indexName ) ;
}
LOGGER . info ( "This Task is OK ! taskId = " + taskId ) ;
Integer cache_num = task . getCacheNum ( ) ;
@ -363,6 +377,7 @@ public class QueryService {
System . out . println ( docId + "----- filePath : " + filePath ) ;
System . out . println ( "===== srcFileList :" + srcFileList ) ;
System . out . println ( "-=-=- filePathSize : " + filePathSize ) ;
System . out . println ( "-----------------forwardUrl" + data . get ( "forwardUrl" ) ) ;
data = getFilePath ( data , filePath , srcFileList , filePathSize ) ;
}
@ -381,9 +396,9 @@ public class QueryService {
System . out . println ( docId + "----- videoPath : " + videoPath ) ;
System . out . println ( "===== srcVideoList :" + srcVideoList ) ;
System . out . println ( "-=-=- videoPathSize : " + videoPathSize ) ;
System . out . println ( "-----------------videoUrl" + data . get ( "videoUrl" ) ) ;
data = getVideoPath ( data , videoPath , srcVideoList , videoPathSize ) ;
}
/ / 图片下载
List < String > imagePath = ( List < String > ) data . get ( ESConstants . IMAGEPATH ) ;
List < Map < String , String > > srcImageList = new ArrayList < > ( ) ;
@ -398,6 +413,7 @@ public class QueryService {
System . out . println ( docId + "----- imagePath : " + imagePath ) ;
System . out . println ( "===== srcImageList :" + srcImageList ) ;
System . out . println ( "-=-=- imagePathSize : " + imagePathSize ) ;
System . out . println ( "-----------------PictureList" + data . get ( "pictureList" ) ) ;
data = getImagePath ( data , imagePath , srcImageList , imagePathSize ) ;
}
/ / System . out . println ( "***** " + data ) ;
@ -583,7 +599,7 @@ public class QueryService {
List < Map < String , String > > srcList = new ArrayList ( ) ;
for ( String downloadUrl : pathList ) {
Map < String , String > srcMap = new HashMap < > ( ) ;
srcMap . put ( ESConstants . ORIGINALURL , downloadUrl ) ;
String resolution = "" ;
String videoTime = "" ;
try {
@ -615,12 +631,70 @@ public class QueryService {
pathSizeList . add ( pathSizeMap ) ;
/ / 这个是 用来做 gofast 和原链接替换的 , key 是原链接 , value 是go - fast 链接 ,
String goFastUrl = pathSizeMap . get ( ESConstants . URL ) ;
String originalUrl = "" ;
if ( downloadType = = 0 ) {
System . out . println ( "有附件拉~~~~~~~~~~~~~~~~~~" + data . get ( ESConstants . FORWARD_URL ) ) ;
if ( data . get ( ESConstants . FORWARD_URL ) . toString ( ) . contains ( ESConstants . GOFASTURL ) ) {
JSONArray forwardUrl = JSONObject . parseArray ( data . get ( ESConstants . FORWARD_URL ) . toString ( ) ) ;
for ( Object forwardUrlMap : forwardUrl ) {
Map < String , String > forward = ( Map < String , String > ) JSONObject . parse ( forwardUrlMap . toString ( ) ) ;
String fileOriginalUrl = forward . get ( ESConstants . ORIGINALURL ) ;
String fileGofastUrl = forward . get ( ESConstants . GOFASTURL ) ;
if ( downloadUrl . equals ( fileGofastUrl ) ) {
originalUrl = fileOriginalUrl ;
}
}
}
} else if ( downloadType = = 1 ) {
if ( data . get ( ESConstants . PICTURE_LIST ) . toString ( ) . contains ( ESConstants . IMG_ ) ) {
JSONObject pictureListMap = JSONObject . parseObject ( data . get ( ESConstants . PICTURE_LIST ) . toString ( ) ) ;
for ( Map . Entry < String , Object > entry : pictureListMap . entrySet ( ) ) {
Map < String , String > picMap = ( Map < String , String > ) entry . getValue ( ) ;
String imageOriginalUrl = picMap . get ( ESConstants . RAWIMG ) ;
String uploadImg = picMap . get ( ESConstants . UPLOADIMG ) ;
if ( downloadUrl . equals ( uploadImg ) ) {
originalUrl = imageOriginalUrl ;
}
}
}
} else if ( downloadType = = 2 ) {
System . out . println ( "有视频拉~~~~~~~~~~~~~~~~~~" + data . get ( ESConstants . VIDEOURL ) ) ;
if ( data . get ( ESConstants . VIDEOURL ) . toString ( ) . contains ( ESConstants . GOFASTURL ) ) {
JSONArray videoUrl = JSONObject . parseArray ( data . get ( ESConstants . VIDEOURL ) . toString ( ) ) ;
for ( Object videoMap : videoUrl ) {
Map < String , String > video = ( Map < String , String > ) JSONObject . parse ( videoMap . toString ( ) ) ;
String videoGofastUrl = video . get ( ESConstants . GOFASTURL ) ;
String videoOriginalUrl = video . get ( ESConstants . ORIGINALURL ) ;
if ( downloadUrl . equals ( videoGofastUrl ) ) {
originalUrl = videoOriginalUrl ;
}
}
} else {
originalUrl = data . getString ( ESConstants . VIDEOURL ) ;
}
}
srcMap . put ( ESConstants . ORIGINALURL , originalUrl ) ; / / 这个原始链接应该放抓取到的链接而不是下载用的链接
srcMap . put ( ESConstants . GOFASTURL , goFastUrl ) ;
/ / 这个值使用来替换 三个 Path 的 imagePath , videoPath , filePath
path . add ( goFastUrl ) ;
srcList . add ( srcMap ) ;
} else {
System . out . println ( "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~关注一下这个情况啊!~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" ) ;
if ( downloadType = = 1 ) {
if ( data . get ( ESConstants . PICTURE_LIST ) . toString ( ) . contains ( ESConstants . IMG_ ) ) {
JSONObject pictureListMap = JSONObject . parseObject ( data . get ( ESConstants . PICTURE_LIST ) . toString ( ) ) ;
for ( Map . Entry < String , Object > entry : pictureListMap . entrySet ( ) ) {
srcMap = new HashMap < > ( ) ;
Map < String , String > picMap = ( Map < String , String > ) entry . getValue ( ) ;
String imageOriginalUrl = picMap . get ( ESConstants . RAWIMG ) ;
srcMap . put ( ESConstants . GOFASTURL , "" ) ;
srcMap . put ( ESConstants . ORIGINALURL , imageOriginalUrl ) ; / / 这个原始链接应该放抓取到的链接而不是下载用的链接
srcList . add ( srcMap ) ;
}
}
}
path . add ( downloadUrl ) ;
}
} else { / / 如果 path 中的 url 是 OK的 , 但是不确定 pathSize 和 srcPath 的时候 , 需要做下面的处理
/ / 因为 srcPath 中需要先添加下面两个字段值 , 因此需要先获取 。
String allDownloadUrl = domain + downloadUrl ;
@ -664,7 +738,7 @@ public class QueryService {
e . printStackTrace ( ) ;
}
}
System . out . println ( "pathMap === " + JsonUtils . toJSONString ( pathMap ) ) ;
System . out . println ( "============================= === " + JsonUtils . toJSONString ( pathMap ) ) ;
return pathMap ;
}