@ -1,6 +1,7 @@
package com.bfd.mf.job.service.query ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONArray ;
import com.alibaba.fastjson.JSONObject ;
import com.alibaba.fastjson.serializer.SerializerFeature ;
import com.bfd.crawler.utils.JsonUtils ;
@ -17,6 +18,7 @@ import com.bfd.mf.job.util.Kafka010Utils;
import com.bfd.mf.job.util.ReadLine ;
import com.google.common.collect.Maps ;
import com.google.common.util.concurrent.RateLimiter ;
import kafka.utils.Json ;
import org.apache.commons.lang3.exception.ExceptionUtils ;
import org.assertj.core.util.Lists ;
import org.elasticsearch.index.query.* ;
@ -27,8 +29,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service ;
import javax.annotation.PostConstruct ;
import java.io.File ;
import java.io.FileInputStream ;
import java.io.FileNotFoundException ;
import java.io.IOException ;
import java.math.BigDecimal ;
import java.math.BigInteger ;
import java.net.MalformedURLException ;
import java.net.URL ;
import java.net.URLConnection ;
import java.sql.Timestamp ;
import java.util.* ;
import java.util.concurrent.BlockingQueue ;
@ -40,12 +48,13 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
public class QueryService {
private static final Logger LOGGER = LoggerFactory . getLogger ( QueryService . class ) ;
private static final Long ONE_MINUTE = 1000L * 60 ;
private static BlockingQueue < Map < Lo ng, List < ? extends Number > > > P_TASK_CACHE_RANGE = new LinkedBlockingQueue < > ( ) ;
private static BlockingQueue < Map < Stri ng, List < ? extends Number > > > P_TASK_CACHE_RANGE = new LinkedBlockingQueue < > ( ) ;
private RateLimiter pRateLimiter ;
/ / private RateLimiter dataRateLimiter ;
/ / private RateLimiter cRateLimiter ;
private static String defultAddr = "http://172.18.1.113:8080/upload" ;
private static String defultTopic = "test202012111" ;
/ / private static String defultAddr = "http://172.18.1.113:8080/upload" ;
/ / private static String defultTopic = "test202012111" ;
@Autowired
private AppConfig config ;
@ -93,11 +102,12 @@ public class QueryService {
Long totalSegment = 1L ; / / ( task . getDateEnd ( ) - task . getDateStart ( ) ) / PERIOD_MILLS ; / / 3600000
Long segment = 1L ;
Double progressFactor = 1 . 0 / totalSegment ;
Map < Lo ng, List < ? extends Number > > cache = Maps . newHashMap ( ) ;
Map < Stri ng, List < ? extends Number > > cache = Maps . newHashMap ( ) ;
long taskId = task . getId ( ) . longValue ( ) ;
String appId = task . getAppId ( ) ;
int cache_num = 1 ;
taskRepository . updateStatus ( cache_num , task . getId ( ) . longValue ( ) ) ;
cache . put ( taskId , Lists . newArrayList ( 0L , 0L , progressFactor , totalSegment , segment ) ) ;
cache . put ( taskId + "#@#" + appId , Lists . newArrayList ( 0L , 0L , progressFactor , totalSegment , segment ) ) ;
try {
P_TASK_CACHE_RANGE . put ( cache ) ;
} catch ( InterruptedException e ) {
@ -110,137 +120,141 @@ public class QueryService {
* 这个方法主要是把 要打标的任务先从 ES 中读出来 , 放到 kafka 中 , 然后会有另一个 线程读kafka 进行打标呢 ! ! !
* /
public void produce ( ) {
Map < Lo ng, List < ? extends Number > > range = P_TASK_CACHE_RANGE . poll ( ) ; / / poll - - > 若队列为空 , 返回null
Map < Stri ng, List < ? extends Number > > range = P_TASK_CACHE_RANGE . poll ( ) ; / / poll - - > 若队列为空 , 返回null
if ( Objects . isNull ( range ) ) {
return ;
}
long taskId = 0L ;
String taskIdAppId = "" ;
long fromMills = 0L ;
long toMills = 0L ;
for ( Map . Entry < Lo ng, List < ? extends Number > > entry : range . entrySet ( ) ) {
for ( Map . Entry < Stri ng, List < ? extends Number > > entry : range . entrySet ( ) ) {
entry . getValue ( ) ;
taskId = entry . getKey ( ) ;
taskIdAppId = entry . getKey ( ) ;
}
Task task = taskRepository . findById ( taskId ) . get ( ) ;
LOGGER . info ( "开始拉数据的任务是:" + JSONObject . toJSONString ( task ) ) ;
List < String > docIdsList = new ArrayList < > ( ) ;
try {
/ / 创建过滤条件 & 任务预处理
fromMills = task . getCrawlStartTime ( ) . longValue ( ) ;
toMills = task . getCrawlEndTime ( ) . longValue ( ) ;
Long year = config . getQueryDataYearStarttime ( ) ; / / 获取配置文件中用直接拉年份的时间节点 , 现在设置的是2019年 , 2019年前的全部用年做索引 , 不拆成天
String clusterName = config . esNormalClusterName ( ) ; / / 获取配置文件中ES的名称
/ / 根据条件获取到要查询的索引的集合
if ( toMills > new Date ( ) . getTime ( ) ) {
toMills = new Date ( ) . getTime ( ) ;
}
String [ ] sourceIndices = EsUtils . getIndices ( AppConfig . CL_INDEX , AppConfig . SEPARATOR ,
fromMills , toMills , AppConfig . DATE_FORMAT , config . esNormalUpper ( ) , config . esNormalStandby ( ) , year ) ;
String cid = task . getCid ( ) . toLowerCase ( ) ; / / 站点的cid
String siteType = task . getSiteType ( ) . toString ( ) ; / / 站点的类型 , 主要看是不是电商的 , 因为电商的主贴和评论在ES中的存储方式跟其他的相反
String crawlDataFlag = task . getCrawlDataFlag ( ) ; / / 任务的抓取条件
String crawlContentKey = task . getCrawlContentKey ( ) ; / / 要拉取的字段 , 主要看是否需要拉评论
/ / BigInteger subjectId = task . getSubjectId ( ) ;
/ / Subject subject = subjectRepository . getSubjectBySubjectId ( subjectId . longValue ( ) ) ;
/ / String indexName = "cl_major_" + task . getSubjectId ( ) ; / / 索引名称
String indexName = "cl_major_61qb_12094" ;
Integer cacheNum = task . getCacheNum ( ) ; / / 拉取数据的次数
/ / 当拉数据的次数 大于1 次的时候 , 再拉数据的开始时间就不用是任务设置的开始时间了 , 同时可以再加个采集时间范围限制一下 , 确保拉的数据都是任务添加之后才采集的就行
QueryBuilder queryBuilder ; / / 根据条件组装查询用具
if ( cacheNum > 1 ) { / / 已经拉过历史数据的任务 , 将 开始时间改成当天凌晨 , 查询发表和抓取都是当天的数据 。
long current = System . currentTimeMillis ( ) ; / / 当前时间毫秒数
long zero = current / ( 1000 * 3600 * 24 ) * ( 1000 * 3600 * 24 ) - TimeZone . getDefault ( ) . getRawOffset ( ) ; / / 今天零点零分零秒的毫秒数
fromMills = new Timestamp ( zero ) . getTime ( ) ;
queryBuilder = getQueryBuilder ( fromMills , toMills , cid , crawlDataFlag , cacheNum , siteType ) ;
} else {
fromMills = task . getCrawlStartTime ( ) . longValue ( ) ;
queryBuilder = getQueryBuilder ( fromMills , toMills , cid , crawlDataFlag , cacheNum , siteType ) ;
}
LOGGER . info ( "Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}." ,
taskId ,
indexName ,
new LocalDateTime ( fromMills ) . toString ( AppConfig . DATE_TIME_FORMAT ) ,
new LocalDateTime ( toMills ) . toString ( AppConfig . DATE_TIME_FORMAT ) ,
JSONObject . toJSONString ( sourceIndices ) ,
queryBuilder . toString ( ) ) ;
/ / 传入的参数 集群名称 , 索引名称 , 索引类型 ( type ) , 查询Builder , scroll查询页面大小 , scroll查询scrollId有效时间
String finalTaskId = taskId + "" ;
long pubTime = fromMills ;
long finalFromMills = fromMills ;
long finalToMills = toMills ;
EsUtils . scrollQuery ( clusterName , sourceIndices , ESConstants . INDEX_TYPE ,
queryBuilder , ESConstants . SCROLL_PAGE_SIZE , ESConstants . SCROLL_MINUTES ,
dataList - > {
try {
if ( dataList . size ( ) = = 0 ) {
System . out . println ( "没查到相关的 主贴 数据" ) ;
return ;
}
for ( JSONObject data : dataList ) {
data = getCreateTime ( data , crawlDataFlag ) ;
/ / 离线拉的数据加个字段吧 ! 跟正常拉的数据做区分
if ( data . get ( ESConstants . DOC_TYPE ) . equals ( ESConstants . ITEM ) & & data . get ( ESConstants . PRIMARY ) . equals ( 1 ) ) {
data = getPubTime ( data , pubTime ) ;
}
saveService . initData ( data , finalTaskId ) ;
/ / 发送主贴
/ / 是否要下载图片到指定的 go - fast上
/ / 现在判断视频 、 图片 、 文件是否下载的方式只取决于isDownload 字段
boolean isDownload = data . getBoolean ( ESConstants . ISDOWNLOAD ) ;
if ( isDownload ) {
String goFastAddr = defultAddr ;
data = downloadAndChangePath ( data , goFastAddr ) ;
}
/ / if ( subject . getGoFastSwitch ( ) = = 1 ) {
/ / String goFastAddr = subject . getGoFastAddr ( ) ;
/ / if ( "" = = goFastAddr ) {
/ / goFastAddr = defultAddr ;
/ / }
/ / data = downloadAndChangePath ( data , goFastAddr ) ;
/ / }
/ / 是否写入到指定的kafka
/ / if ( subject . getKafkaSwitch ( ) = = 1 ) {
/ / String kafkaTopic = subject . getKafkaTopic ( ) ;
/ / String kafkaAddr = subject . getKafkaAddr ( ) ;
/ / if ( kafkaTopic . isEmpty ( ) ) {
/ / kafkaTopic = defultTopic ;
/ / }
/ / kafkaProducer . send ( kafkaTopic , JSONObject . toJSONString ( data ) ) ;
/ / }
if ( ! data . get ( "_id_" ) . equals ( "" ) ) {
saveService . saveToEsWithFilter ( config . esMiniClusterName ( ) , indexName , data ) ;
kafkaProducer . send ( config . getSendTopic ( ) , JSONObject . toJSONString ( data ) ) ;
LOGGER . debug ( "Send message, indexName :{} , taskId:{} , ID :{}." , indexName , task . getId ( ) , data . getString ( "_id_" ) ) ;
/ / 将要拉评论的ID 添加到list 中 , ( 电商的数据不用拉评论哦 ) !
if ( ! siteType . equals ( ESConstants . DOCTYPEITEM ) ) {
if ( crawlContentKey . contains ( "comment" ) | | crawlContentKey . contains ( "socialComment" ) ) {
docIdsList . add ( data . get ( ESConstants . DOC_ID ) . toString ( ) ) ;
long taskId = Long . valueOf ( taskIdAppId . split ( "#@#" ) [ 0 ] ) ;
String appId = taskIdAppId . split ( "#@#" ) [ 1 ] ;
List < Task > taskList = taskRepository . findOneTaskByIdAndAppId ( taskId , appId ) ;
if ( taskList . size ( ) > 0 ) {
for ( Task task : taskList ) {
LOGGER . info ( "开始拉数据的任务是:" + JSONObject . toJSONString ( task ) ) ;
List < String > docIdsList = new ArrayList < > ( ) ;
try {
/ / 创建过滤条件 & 任务预处理
fromMills = task . getCrawlStartTime ( ) . longValue ( ) ;
toMills = task . getCrawlEndTime ( ) . longValue ( ) ;
Long year = config . getQueryDataYearStarttime ( ) ; / / 获取配置文件中用直接拉年份的时间节点 , 现在设置的是2019年 , 2019年前的全部用年做索引 , 不拆成天
String clusterName = config . esNormalClusterName ( ) ; / / 获取配置文件中ES的名称
/ / 根据条件获取到要查询的索引的集合
if ( toMills > new Date ( ) . getTime ( ) ) {
toMills = new Date ( ) . getTime ( ) ;
}
/ / 获取时间索引List
String [ ] sourceIndices = EsUtils . getIndices ( AppConfig . CL_INDEX , AppConfig . SEPARATOR ,
fromMills , toMills , AppConfig . DATE_FORMAT , config . esNormalUpper ( ) , config . esNormalStandby ( ) , year ) ;
/ / 组装查询条件需要的参数
String cid = task . getCid ( ) . toLowerCase ( ) ; / / 站点的cid
String siteType = task . getSiteType ( ) . toString ( ) ; / / 站点的类型 , 主要看是不是电商的 , 因为电商的主贴和评论在ES中的存储方式跟其他的相反
String crawlDataFlag = task . getCrawlDataFlag ( ) ; / / 任务的抓取条件
String crawlContentKey = task . getCrawlContentKey ( ) ; / / 要拉取的字段 , 主要看是否需要拉评论
BigInteger subjectId = task . getSubjectId ( ) ;
/ / Long id = task . getId ( ) ;
/ / System . out . println ( "id ==== " + id ) ;
/ / System . out . println ( subjectId ) ;
/ / String appId = task . getAppId ( ) ;
System . out . println ( "**** " + appId ) ;
String indexName = "cl_major_" ;
if ( appId . contains ( "ic" ) ) {
indexName = indexName + subjectId ;
} else {
indexName = indexName + appId . toLowerCase ( ) + "_" + subjectId ; / / cl_major_61qb_12094
}
System . out . println ( "indexName = " + indexName ) ;
Integer cacheNum = task . getCacheNum ( ) ; / / 拉取数据的次数
/ / 当拉数据的次数 大于1 次的时候 , 再拉数据的开始时间就不用是任务设置的开始时间了 , 同时可以再加个采集时间范围限制一下 , 确保拉的数据都是任务添加之后才采集的就行
QueryBuilder queryBuilder ; / / 根据条件组装查询用具
if ( cacheNum > 1 ) { / / 已经拉过历史数据的任务 , 将 开始时间改成当天凌晨 , 查询发表和抓取都是当天的数据 。
long current = System . currentTimeMillis ( ) ; / / 当前时间毫秒数
long zero = current / ( 1000 * 3600 * 24 ) * ( 1000 * 3600 * 24 ) - TimeZone . getDefault ( ) . getRawOffset ( ) ; / / 今天零点零分零秒的毫秒数
fromMills = new Timestamp ( zero ) . getTime ( ) ;
queryBuilder = getQueryBuilder ( fromMills , toMills , cid , crawlDataFlag , cacheNum , siteType ) ;
} else {
fromMills = task . getCrawlStartTime ( ) . longValue ( ) ;
queryBuilder = getQueryBuilder ( fromMills , toMills , cid , crawlDataFlag , cacheNum , siteType ) ;
}
LOGGER . info ( "Query primary, task:{}, index:{}, from:{}, to:{}, indices:{}, dsl:{}." ,
taskId ,
indexName ,
new LocalDateTime ( fromMills ) . toString ( AppConfig . DATE_TIME_FORMAT ) ,
new LocalDateTime ( toMills ) . toString ( AppConfig . DATE_TIME_FORMAT ) ,
JSONObject . toJSONString ( sourceIndices ) ,
queryBuilder . toString ( ) ) ;
/ / 传入的参数 集群名称 , 索引名称 , 索引类型 ( type ) , 查询Builder , scroll查询页面大小 , scroll查询scrollId有效时间
String finalTaskId = taskId + "" ;
long pubTime = fromMills ;
long finalFromMills = fromMills ;
long finalToMills = toMills ;
String finalIndexName = indexName ;
EsUtils . scrollQuery ( clusterName , sourceIndices , ESConstants . INDEX_TYPE ,
queryBuilder , ESConstants . SCROLL_PAGE_SIZE , ESConstants . SCROLL_MINUTES ,
dataList - > {
try {
if ( dataList . size ( ) = = 0 ) {
System . out . println ( "没查到相关的 主贴 数据" ) ;
return ;
}
for ( JSONObject data : dataList ) {
data = getCreateTime ( data , crawlDataFlag ) ;
/ / 为了保证电商主贴的数据能够正常的再页面进行展示 , 需要将电商主贴的发表时间改成拉数时间范围内的时间才行
if ( data . get ( ESConstants . DOC_TYPE ) . equals ( ESConstants . ITEM ) & & data . get ( ESConstants . PRIMARY ) . equals ( 1 ) ) {
data = getPubTime ( data , pubTime ) ;
}
/ / 离线拉的数据加个字段吧 ! 跟正常拉的数据和采集的数据做区分
saveService . initData ( data , finalTaskId ) ;
/ / 发送主贴
/ / 是否要下载图片到指定的 go - fast上
/ / 现在判断视频 、 图片 、 文件是否下载的方式只取决于isDownload 字段
boolean isDownload = data . getBoolean ( ESConstants . ISDOWNLOAD ) ;
if ( isDownload ) {
/ / String goFastAddr = defultAddr ;
data = downloadAndChangePath ( data ) ;
}
if ( ! data . get ( "_id_" ) . equals ( "" ) ) {
/ / saveService . saveToEsWithFilter ( config . esMiniClusterName ( ) , indexName , data ) ;
/ / kafkaProducer . send ( config . getSendTopic ( ) , JSONObject . toJSONString ( data ) ) ;
LOGGER . debug ( "Send message, indexName :{} , taskId:{} , ID :{}." , finalIndexName , task . getId ( ) , data . getString ( "_id_" ) ) ;
/ / 将要拉评论的ID 添加到list 中 , ( 电商的数据不用拉评论哦 ) !
if ( ! siteType . equals ( ESConstants . DOCTYPEITEM ) ) {
if ( crawlContentKey . contains ( "comment" ) | | crawlContentKey . contains ( "socialComment" ) ) {
docIdsList . add ( data . get ( ESConstants . DOC_ID ) . toString ( ) ) ;
}
}
}
}
} catch ( Exception e ) {
System . out . println ( "ERROR: " + dataList ) ;
throw new RuntimeException ( e ) ;
}
}
} catch ( Exception e ) {
System . out . println ( "******* " + dataList ) ;
throw new RuntimeException ( e ) ;
}
} ) ;
} ) ;
/ / 开始拉评论数据
if ( docIdsList . size ( ) > 0 ) {
String docType = docIdsList . get ( 0 ) . split ( "_" ) [ 1 ] ;
String docIds [ ] = docIdsList . toArray ( new String [ 0 ] ) ;
queryComments ( docIds , docType , finalFromMills , finalToMills , finalTaskId , crawlDataFlag , indexName ) ;
/ / 开始拉评论数据
if ( docIdsList . size ( ) > 0 ) {
String docType = docIdsList . get ( 0 ) . split ( "_" ) [ 1 ] ;
String docIds [ ] = docIdsList . toArray ( new String [ 0 ] ) ;
/ / queryComments ( docIds , docType , finalFromMills , finalToMills , finalTaskId , crawlDataFlag , indexName ) ;
}
LOGGER . info ( "This Task is OK ! taskId = " + taskId ) ;
Integer cache_num = task . getCacheNum ( ) ;
cache_num = cache_num + 1 ;
taskRepository . updateStatus ( cache_num , taskId ) ;
} catch ( Exception e ) {
JSONObject msg = new JSONObject ( ) ;
msg . put ( "message" , "produce error due to [" + ExceptionUtils . getStackTrace ( e ) + "]" ) ;
msg . put ( "from" , fromMills ) ;
msg . put ( "to" , toMills ) ;
LOGGER . error ( "Produce error due to [{}]." , e . getMessage ( ) , e ) ;
}
}
LOGGER . info ( "This Task is OK ! taskId = " + taskId ) ;
Integer cache_num = task . getCacheNum ( ) ;
cache_num = cache_num + 1 ;
taskRepository . updateStatus ( cache_num , task . getId ( ) . longValue ( ) ) ;
} catch ( Exception e ) {
JSONObject msg = new JSONObject ( ) ;
msg . put ( "message" , "produce error due to [" + ExceptionUtils . getStackTrace ( e ) + "]" ) ;
msg . put ( "from" , fromMills ) ;
msg . put ( "to" , toMills ) ;
LOGGER . error ( "Produce error due to [{}]." , e . getMessage ( ) , e ) ;
}
}
@ -264,7 +278,7 @@ public class QueryService {
private void queryComments ( String [ ] docId , String docType ,
long startTime , long endTime ,
String crawlDataFlag , String finalTaskId ,
String finalTaskId , String crawlDataFlag ,
String indexName ) {
LOGGER . info ( "开始拉取评论数据:" ) ;
QueryBuilder queryBuilder = getQueryBuilder ( docId , startTime , endTime ) ;
@ -301,9 +315,8 @@ public class QueryService {
BoolQueryBuilder boolQueryBuilder = QueryBuilders . boolQuery ( ) ;
try {
/ / 筛选时间
boolean boo = true ;
QueryBuilder pubTimeRange = buildRangeQueryBuilder (
ESConstants . PUBTIME , startTime - 2 * ONE_MINUTE , endTime , boo , boo ) ;
ESConstants . PUBTIME , startTime - 2 * ONE_MINUTE , endTime ) ;
boolQueryBuilder . must ( pubTimeRange ) ;
/ / 筛选ID
QueryBuilder termQueryBuilder = QueryBuilders . termsQuery ( ESConstants . DOC_ID , docId ) ;
@ -316,215 +329,290 @@ public class QueryService {
/ * *
* 下载 文件 、 视频 、 图片 , 并将新的路径替换写入到 pathSize中
* videoPath = = egc
videoPath = = egc
filePath = = ugc
imagePath = = pgc
* /
private JSONObject downloadAndChangePath ( JSONObject data , String goFastAddr ) {
String isDownload = data . get ( ESConstants . ISDOWNLOAD ) . toString ( ) ;
Map < String , String > goFastMap = new HashMap < > ( ) ;
List < Map < String , String > > filePathSize = new ArrayList < > ( ) ;
List < Map < String , String > > videoPathSize = new ArrayList < > ( ) ;
List < Map < String , String > > imagePathSize = new ArrayList < > ( ) ;
/ / 文件下载
List < String > filePath = ( List < String > ) data . get ( ESConstants . FILEPATH ) ;
if ( filePath . size ( ) > 0 ) {
/ / 调用下载接口 , 下载并将附件上传到自己的go - fast 上
Map < String , Object > srcPathMap = getPathSize ( filePath , goFastAddr , 0 , data ) ;
filePath = ( List < String > ) srcPathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . FILEPATH , filePath ) ;
/ / 组装 FILEPATHSIZE 字段
filePathSize = ( List < Map < String , String > > ) srcPathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( filePathSize . size ( ) > 0 ) {
data . put ( ESConstants . FILEPATHSIZE , JSONObject . toJSONString ( filePathSize ) ) ;
data . put ( ESConstants . UGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , isDownload ) ;
}
/ / 组装 SRCFILEPATH 字段
Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) srcPathMap . get ( "srcMap" ) ;
if ( data . containsKey ( "forwardUrl" ) & & null ! = data . get ( "forwardUrl" ) ) {
try {
List < Map < String , String > > forwardUrl = JsonUtils . parseArray ( data . get ( "forwardUrl" ) . toString ( ) ) ;
List < Map < String , String > > srcPath = getSrcPath ( forwardUrl , srcAndGofastUrlMap ) ;
data . put ( ESConstants . SRCFILEPATH , JSON . toJSONString ( srcPath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
是否需要走下载这个 , 除了要看 isDownload 字段之外 , 还需要看 filePath \ imagePath \ videoPath 是否为空
如果都为空 , 直接将isDownload 置为 false即可
如果有一个不为空 , 则需要判断链接是否有 http 前缀 , 有前缀的做下载 , 并且输出时将前缀替换掉 。 没有的就跳过 。
然后再看 path 对应的 pathSize 和 src 是否都有值 , 且去了前缀 , 要是有值没去前缀 , 做处理 , 要是没值则添加 。
* /
private JSONObject downloadAndChangePath ( JSONObject data ) {
try {
/ / 文件下载 , 之所以提取对应的src 字段是因为如果附件被下载过了 , 这个字段的值需要一一对应 , 就直接填写了 , 不用再做下载回填了 。
List < String > filePath = ( List < String > ) data . get ( ESConstants . FILEPATH ) ;
List < Map < String , String > > srcFileList = JsonUtils . parseArray ( data . get ( ESConstants . SRCFILEPATH ) . toString ( ) ) ;
if ( filePath . size ( ) > 0 ) {
data = getFilePath ( data , filePath , srcFileList ) ;
}
}
/ / 视频下载
List < String > videoPath = ( List < String > ) data . get ( ESConstants . VIDEOPATH ) ;
if ( videoPath . size ( ) > 0 ) {
/ / List < Map < String , String > > videoPathSize = getPathSize ( videoPath , goFastAddr , 1 , data ) ;
System . out . println ( "************ 要下载的视频链接的 List : " + videoPath ) ;
Map < String , Object > srcPathMap = getPathSize ( videoPath , goFastAddr , 0 , data ) ;
videoPath = ( List < String > ) srcPathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . VIDEOPATH , videoPath ) ;
videoPathSize = ( List < Map < String , String > > ) srcPathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( videoPathSize . size ( ) > 0 ) {
data . put ( ESConstants . VIDEOPATHSIZE , JSONObject . toJSONString ( videoPathSize ) ) ;
data . put ( ESConstants . EGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , isDownload ) ;
}
/ / 组装 SRCVIDEOPATH 字段
Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) srcPathMap . get ( "srcMap" ) ;
if ( data . containsKey ( "videoUrl" ) & & null ! = data . get ( "videoUrl" ) ) {
List < Map < String , String > > srcPath = new ArrayList < > ( ) ;
if ( data . get ( "videoUrl" ) . toString ( ) . contains ( "originalUrl" ) ) {
try {
List < Map < String , String > > videoUrl = JsonUtils . parseArray ( data . get ( "videoUrl" ) . toString ( ) ) ;
srcPath = getSrcPath ( videoUrl , srcAndGofastUrlMap ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
} else {
List < String > videoUrl = new ArrayList < > ( ) ;
try {
if ( data . get ( "videoUrl" ) . toString ( ) . contains ( "[" ) ) {
videoUrl = JsonUtils . parseArray ( data . get ( "videoUrl" ) . toString ( ) ) ;
} else {
videoUrl . add ( data . get ( "videoUrl" ) . toString ( ) ) ;
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
srcPath = new ArrayList < > ( ) ;
Map < String , String > srcurlMap = new HashMap < > ( ) ;
if ( videoPath . size ( ) > 0 ) {
srcurlMap . put ( ESConstants . GOFASTURL , videoPath . get ( 0 ) ) ;
}
System . out . println ( "===============视频原链接的List: " + videoUrl ) ;
if ( videoUrl . size ( ) > 0 ) {
srcurlMap . put ( ESConstants . ORIGINALURL , videoUrl . get ( 0 ) ) ;
}
if ( srcurlMap . size ( ) > 0 ) {
srcPath . add ( srcurlMap ) ;
}
}
data . put ( ESConstants . SRCVIDEOPATH , JSON . toJSONString ( srcPath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
/ / 视频下载
List < String > videoPath = ( List < String > ) data . get ( ESConstants . VIDEOPATH ) ;
if ( videoPath . size ( ) > 0 ) {
List < Map < String , String > > srcVideoList = JsonUtils . parseArray ( data . get ( ESConstants . SRCVIDEOPATH ) . toString ( ) ) ;
data = getVideoPath ( data , videoPath , srcVideoList ) ;
}
}
/ / 图片下载
List < String > imagePath = ( List < String > ) data . get ( ESConstants . IMAGEPATH ) ;
if ( imagePath . size ( ) > 0 ) {
/ / List < Map < String , String > > imagePathSize = getPathSize ( imagePath , goFastAddr , 2 , data ) ;
Map < String , Object > srcPathMap = getPathSize ( imagePath , goFastAddr , 0 , data ) ;
imagePath = ( List < String > ) srcPathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . IMAGEPATH , imagePath ) ;
imagePathSize = ( List < Map < String , String > > ) srcPathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( imagePathSize . size ( ) > 0 ) {
data . put ( ESConstants . IMAGEPATHSIZE , JSONObject . toJSONString ( imagePathSize ) ) ;
data . put ( ESConstants . PGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , isDownload ) ;
/ / 图片下载
List < String > imagePath = ( List < String > ) data . get ( ESConstants . IMAGEPATH ) ;
List < Map < String , String > > srcImageList = JsonUtils . parseArray ( data . get ( ESConstants . SRCIMAGEPATH ) . toString ( ) ) ;
if ( imagePath . size ( ) > 0 ) {
data = getImagePath ( data , imagePath , srcImageList ) ;
}
Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) srcPathMap . get ( "srcMap" ) ;
List < Map < String , String > > srcPath = new ArrayList < > ( ) ;
if ( data . containsKey ( "pictureList" ) & & null ! = data . get ( "pictureList" ) ) {
Map < String , Object > pictureList = JSONObject . parseObject ( data . get ( "pictureList" ) . toString ( ) ) ;
if ( ! pictureList . isEmpty ( ) ) {
Map < String , String > srcurlMap = new HashMap < > ( ) ;
for ( Map . Entry < String , Object > entry : pictureList . entrySet ( ) ) {
Map < String , Object > imgmap = ( Map < String , Object > ) entry . getValue ( ) ;
if ( imgmap . containsKey ( "uploadImg" ) & & imgmap . get ( "uploadImg" ) ! = null & & imgmap . get ( "uploadImg" ) ! = "" ) {
srcurlMap . put ( ESConstants . GOFASTURL , srcAndGofastUrlMap . get ( imgmap . get ( "uploadImg" ) ) ) ;
srcurlMap . put ( ESConstants . ORIGINALURL , imgmap . get ( "img" ) . toString ( ) ) ;
}
srcPath . add ( srcurlMap ) ;
}
}
/ / isDownload 填写
if ( filePath . size ( ) = = 0 & & videoPath . size ( ) = = 0 & & imagePath . size ( ) = = 0 ) {
data . put ( ESConstants . ISDOWNLOAD , "false" ) ;
}
data . put ( ESConstants . SRCIMAGEPATH , JSON . toJSONString ( srcPath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
return data ;
/ / 当 filePath > 0 , 但是没有 filePathSize 的时候 , 需要下载组装 filePathSize
/ / 当 filePath > 0 , 但是没有 srcFilePath 的时候 , 需要下载组装 srcFilePath , 这个时候可以取 forwardUrl
/ / 当 videoPath > 0 , 但是没有 videoPathSize 的时候 , 需要下载组装 videoPathSize
/ / 当 videoPath > 0 , 但是没有 srcVideoPath 的时候 , 需要下载组装 srcVideoPath , 这个时候可以取 videoUrl
/ / 当 imagePath > 0 , 但是没有 imagePathSize 的时候 , 需要下载组装 imagePathSize
/ / 当 imagePath > 0 , 但是没有 srcImagePath 的时候 , 需要下载组装 srcImagePath , 这个时候可以取 pictureList
/ / 当三个 pathSize 都为 0 的时候 , 表示三个下载结果都为空 , 为了保持页面和实际结果的统一 , 这块改成 false
if ( filePathSize . size ( ) = = 0 & & videoPathSize . size ( ) = = 0 & & imagePathSize . size ( ) = = 0 ) {
data . put ( ESConstants . ISDOWNLOAD , "false" ) ;
}
private JSONObject getImagePath ( JSONObject data , List < String > imagePath , List < Map < String , String > > srcImageList ) {
Map < String , Object > pathMap = getPathSize ( imagePath , 1 , data , srcImageList ) ;
LOGGER . info ( "下载图片后的 pathMap : {}." , JsonUtils . toJSONString ( pathMap ) ) ;
if ( pathMap . size ( ) > 0 ) {
imagePath = ( List < String > ) pathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . IMAGEPATH , imagePath ) ;
List < Map < String , String > > imagePathSize = ( List < Map < String , String > > ) pathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( imagePathSize . size ( ) > 0 ) {
data . put ( ESConstants . IMAGEPATHSIZE , JSONObject . toJSONString ( imagePathSize ) ) ;
data . put ( ESConstants . PGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , true ) ;
}
/ / Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) srcPathMap . get ( "srcMap" ) ;
List < Map < String , String > > srcImagePath = ( List < Map < String , String > > ) pathMap . get ( ESConstants . SRCLIST ) ;
/ / if ( data . containsKey ( "pictureList" ) & & null ! = data . get ( "pictureList" ) ) {
/ / / / 如果是社交类的 pictureList 是个List 微博的是个 String , 让晨睿给改一下 , 这边加个判断 。
/ / if ( data . get ( ESConstants . DOC_TYPE ) . equals ( ESConstants . SOCIAL ) ) {
/ / if ( data . get ( "pictureList" ) . toString ( ) . contains ( "[" ) ) {
/ / JSONArray pictureList = JSONObject . parseArray ( data . get ( "pictureList" ) . toString ( ) ) ;
/ / if ( pictureList . size ( ) > 0 ) {
/ / for ( Object picture : pictureList ) {
/ / / / 获取 srcUrlMap 的方法
/ / Map < String , String > srcurlMap = getSrcUrlMap ( picture . toString ( ) ) ;
/ / srcPath . add ( srcurlMap ) ;
/ / }
/ / }
/ / } else if ( data . get ( "pictureList" ) . toString ( ) . contains ( "|" ) ) {
/ / String lists [ ] = data . getString ( "pictureList" ) . toString ( ) . split ( "|" ) ;
/ / for ( String picture : lists ) {
/ / Map < String , String > srcurlMap = getSrcUrlMap ( picture ) ;
/ / srcPath . add ( srcurlMap ) ;
/ / }
/ / } else {
/ / String picture = data . get ( "pictureList" ) . toString ( ) ;
/ / Map < String , String > srcurlMap = getSrcUrlMap ( picture ) ;
/ / srcPath . add ( srcurlMap ) ;
/ / }
/ / } else {
/ / Map < String , Object > pictureList = JSONObject . parseObject ( data . get ( "pictureList" ) . toString ( ) ) ;
/ / if ( ! pictureList . isEmpty ( ) ) {
/ / Map < String , String > srcurlMap = new HashMap < > ( ) ;
/ / for ( Map . Entry < String , Object > entry : pictureList . entrySet ( ) ) {
/ / Map < String , Object > imgmap = ( Map < String , Object > ) entry . getValue ( ) ;
/ / if ( imgmap . containsKey ( "uploadImg" ) & & imgmap . get ( "uploadImg" ) ! = null & & imgmap . get ( "uploadImg" ) ! = "" ) {
/ / srcurlMap . put ( ESConstants . GOFASTURL , srcAndGofastUrlMap . get ( imgmap . get ( "uploadImg" ) ) ) ;
/ / srcurlMap . put ( ESConstants . ORIGINALURL , imgmap . get ( "img" ) . toString ( ) ) ;
/ / }
/ / srcPath . add ( srcurlMap ) ;
/ / }
/ / }
/ / }
/ / }
data . put ( ESConstants . SRCIMAGEPATH , JSON . toJSONString ( srcImagePath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
}
return data ;
}
private JSONObject getVideoPath ( JSONObject data , List < String > videoPath , List < Map < String , String > > srcVideoList ) {
Map < String , Object > pathMap = getPathSize ( videoPath , 2 , data , srcVideoList ) ;
LOGGER . info ( "下载视频后的 pathMap : {}." , JsonUtils . toJSONString ( pathMap ) ) ;
/ / 先做判断 , 如果 pathMap = = 0 的话 , 对应的 videoPath 、 videoPathSize 、 srcVideoPath 都保持不变即可
if ( pathMap . size ( ) > 0 ) {
/ / videoPath 字段填充
videoPath = ( List < String > ) pathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . VIDEOPATH , videoPath ) ;
/ / videoPathSize 字段填充 先做判断 , 如果
List < Map < String , String > > videoPathSize = ( List < Map < String , String > > ) pathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( videoPathSize . size ( ) > 0 ) {
data . put ( ESConstants . VIDEOPATHSIZE , JSONObject . toJSONString ( videoPathSize ) ) ;
data . put ( ESConstants . EGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , true ) ;
}
/ / 组装 SRCVIDEOPATH 字段
List < Map < String , String > > srcVideoPath = ( List < Map < String , String > > ) pathMap . get ( ESConstants . SRCLIST ) ;
data . put ( ESConstants . SRCVIDEOPATH , JSON . toJSONString ( srcVideoPath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
}
/ / Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) pathMap . get ( ESConstants . SRCLIST ) ;
/ / if ( data . containsKey ( "videoUrl" ) & & null ! = data . get ( "videoUrl" ) ) {
/ / List < Map < String , String > > srcPath = new ArrayList < > ( ) ;
/ / if ( data . get ( "videoUrl" ) . toString ( ) . contains ( "originalUrl" ) ) {
/ / try {
/ / List < Map < String , String > > videoUrl = JsonUtils . parseArray ( data . get ( "videoUrl" ) . toString ( ) ) ;
/ / srcPath = getSrcPath ( videoUrl , srcAndGofastUrlMap ) ;
/ / } catch ( Exception e ) {
/ / e . printStackTrace ( ) ;
/ / }
/ / } else {
/ / List < String > videoUrl = new ArrayList < > ( ) ;
/ / try {
/ / if ( data . get ( "videoUrl" ) . toString ( ) . contains ( "[" ) ) {
/ / videoUrl = JsonUtils . parseArray ( data . get ( "videoUrl" ) . toString ( ) ) ;
/ / } else {
/ / videoUrl . add ( data . get ( "videoUrl" ) . toString ( ) ) ;
/ / }
/ / } catch ( Exception e ) {
/ / e . printStackTrace ( ) ;
/ / }
/ / srcPath = new ArrayList < > ( ) ;
/ / Map < String , String > srcurlMap = new HashMap < > ( ) ;
/ / if ( videoPath . size ( ) > 0 ) {
/ / srcurlMap . put ( ESConstants . GOFASTURL , videoPath . get ( 0 ) ) ;
/ / }
/ / System . out . println ( "===============视频原链接的List: " + videoUrl ) ;
/ / if ( videoUrl . size ( ) > 0 ) {
/ / srcurlMap . put ( ESConstants . ORIGINALURL , videoUrl . get ( 0 ) ) ;
/ / }
/ / if ( srcurlMap . size ( ) > 0 ) {
/ / srcPath . add ( srcurlMap ) ;
/ / }
/ / }
/ / }
return data ;
}
private List < Map < String , String > > getSrcPath ( List < Map < String , String > > forwardUrl , Map < String , String > srcAndGofastUrlMap ) {
List < Map < String , String > > srcPathList = new ArrayList < > ( ) ;
for ( Map < String , String > urlMap : forwardUrl ) {
if ( null ! = urlMap ) {
Map < String , String > srcurlMap = new HashMap < > ( ) ;
if ( urlMap . containsKey ( ESConstants . GOFASTURL ) & & null ! = urlMap . get ( ESConstants . GOFASTURL ) ) {
srcurlMap . put ( ESConstants . GOFASTURL , srcAndGofastUrlMap . get ( urlMap . get ( ESConstants . GOFASTURL ) ) ) ;
} else {
srcurlMap . put ( ESConstants . GOFASTURL , "" ) ;
}
srcurlMap . put ( ESConstants . ORIGINALURL , urlMap . get ( ESConstants . ORIGINALURL ) ) ;
srcPathList . add ( srcurlMap ) ;
private JSONObject getFilePath ( JSONObject data , List < String > filePath , List < Map < String , String > > srcFileList ) {
/ / 调用下载接口 , 下载并将附件上传到自己的go - fast 上
Map < String , Object > pathMap = getPathSize ( filePath , 0 , data , srcFileList ) ;
LOGGER . info ( "下载文件后的 pathMap : {}." , JsonUtils . toJSONString ( pathMap ) ) ;
if ( pathMap . size ( ) > 0 ) {
/ / 下载替换后的 path List
filePath = ( List < String > ) pathMap . get ( ESConstants . PATH ) ;
data . put ( ESConstants . FILEPATH , filePath ) ;
/ / 组装 FILEPATHSIZE 字段
List < Map < String , String > > filePathSize = ( List < Map < String , String > > ) pathMap . get ( ESConstants . PATHSIZELIST ) ;
if ( filePathSize . size ( ) > 0 ) {
data . put ( ESConstants . FILEPATHSIZE , JSONObject . toJSONString ( filePathSize ) ) ;
data . put ( ESConstants . UGC , 1 ) ;
data . put ( ESConstants . ISDOWNLOAD , true ) ;
}
/ / 组装 SRCFILEPATH 字段
/ / Map < String , String > srcAndGofastUrlMap = ( Map < String , String > ) pathMap . get ( "srcMap" ) ;
/ / System . out . println ( "**** " + JSONObject . toJSONString ( pathMap . get ( ESConstants . SRCLIST ) ) ) ;
List < Map < String , String > > srcFilePath = ( List < Map < String , String > > ) pathMap . get ( ESConstants . SRCLIST ) ;
data . put ( ESConstants . SRCFILEPATH , JSON . toJSONString ( srcFilePath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
}
return srcPathList ;
/ / if ( data . containsKey ( ESConstants . FORWARD_URL ) & & ! ( "" ) . equals ( data . getString ( ESConstants . FORWARD_URL ) ) & & null ! = data . get ( ESConstants . FORWARD_URL ) ) {
/ / try {
/ / List < Map < String , String > > forwardUrl = JsonUtils . parseArray ( data . get ( ESConstants . FORWARD_URL ) . toString ( ) ) ;
/ / / / List < Map < String , String > > srcPath = getSrcPath ( forwardUrl , srcAndGofastUrlMap ) ;
/ / data . put ( ESConstants . SRCFILEPATH , JSON . toJSONString ( srcFilePath , SerializerFeature . DisableCircularReferenceDetect ) ) ;
/ / } catch ( Exception e ) {
/ / e . printStackTrace ( ) ;
/ / }
/ / }
return data ;
}
/ / public static void main ( String [ ] args ) {
/ / QueryService queryService = new QueryService ( ) ;
/ / List < String > list = ReadLine . readLine ( new File ( "E:\\work/test1.txt" ) ) ;
/ / JSONObject data = JSONObject . parseObject ( list . get ( 0 ) ) ;
/ / String goFastAddr = "http://172.18.1.113:8080/upload" ;
/ / JSONObject result = queryService . downloadAndChangePath ( data , goFastAddr ) ;
/ / System . out . println ( result ) ;
/ /
/ / }
/ * *
* downloadType = 0 文件 = 1 图片 = 2 视频
* downloadType = 0 文件 ; = 1 图片 ; = 2 视频
* /
private Map < String , Object > getPathSize ( List < String > pathList , String goFastAddr , Integer downloadType , JSONObject data ) {
private Map < String , Object > getPathSize ( List < String > pathList , Integer downloadType , JSONObject data , List < Map < String , String > > srcxxxList ) {
String domain = config . getGoFastDomain ( ) ;
Map < String , Object > pathMap = new HashMap < > ( ) ;
List < Map < String , String > > pathSizeList = new ArrayList < > ( ) ;
List < String > path = new ArrayList < > ( ) ;
Map < String , String > srcMap = new HashMap < > ( ) ;
List < Map < String , String > > srcList = new ArrayList ( ) ;
for ( String downloadUrl : pathList ) {
Map < String , String > srcMap = new HashMap < > ( ) ;
srcMap . put ( ESConstants . ORIGINALURL , downloadUrl ) ;
String resolution = "" ;
String videoTime = "" ;
try {
if ( null ! = downloadUrl & & ! downloadUrl . contains ( "si-te.percent.cn" ) ) {
if ( downloadUrl . contains ( "http" ) ) {
Map < String , String > pathSizeMap = DownLoadFile . downloadAndSaveFile ( downloadUrl , goFastAddr ) ;
LOGGER . info ( "[QueryService] getPathSize goFaskAddr {}. resultMap {}." , goFastAddr , pathSizeMap ) ;
/ / 一下三种情况都是需要下载的情况 , 另外还有 path 中的链接不需要下载的情况 , 怎么补全pathSize 和 srcPath 字段
/ / 很可能 path 字段本身不需要下载 , 但是其他两个字段都需要下载后补全才行 。
if ( downloadUrl . contains ( "http" ) | | downloadUrl . contains ( "group1" ) | | downloadUrl . contains ( "group2" ) ) {
if ( downloadUrl . contains ( "group1" ) | | downloadUrl . contains ( "group2" ) ) {
downloadUrl = domain + downloadUrl ;
}
Map < String , String > pathSizeMap = DownLoadFile . downloadAndSaveFile ( downloadUrl , config . getGoFastPostUrl ( ) ) ;
/ / LOGGER . info ( "[QueryService] getPathSize goFaskAddr {}. resultMap {}." , config . getGoFastPostUrl ( ) , pathSizeMap ) ;
/ / Map < String , String > pathSizeMap = DownLoadFile . downloadAndSaveFile ( downloadUrl , "http://172.18.1.113:8080/upload" ) ;
if ( pathSizeMap . size ( ) > 0 ) {
if ( downloadType = = 1 ) { / / 视频
if ( downloadType = = 2 ) { / / 视频
resolution = data . getString ( ESConstants . RESOLUTION ) ;
videoTime = data . getString ( ESConstants . VIDEOTIME ) ;
}
if ( downloadType = = 2 ) { / / 图片
resolution = DownLoadFile . imagesize ( downloadUrl ) ;
if ( downloadType = = 1 ) { / / 图片
resolution = DownLoadFile . getImageResolution ( downloadUrl ) ;
}
/ / String url = pathSizeMap . get ( "realUrl" ) . replace ( config . getGoFastDomain ( ) , "" ) ;
String url = pathSizeMap . get ( "realUrl" ) . replace ( "http://172.18.1.113:8080" , "" ) ;
String size = pathSizeMap . get ( "size" ) + "KB" ;
pathSizeMap . put ( ESConstants . URL , url ) ;
pathSizeMap . put ( ESConstants . SIZE , size ) ;
pathSizeMap . put ( ESConstants . RESOLUTION , resolution ) ;
pathSizeMap . put ( ESConstants . VIDEOTIME , videoTime ) ;
pathSizeMap . remove ( "realUrl" ) ;
/ / 这个是三个PathSize imagePathSize , videoPathSize filePathSize
pathSizeList . add ( pathSizeMap ) ;
/ / 这个是 用来做 gofast 和原链接替换的 , key 是原链接 , value 是go - fast 链接 ,
srcMap . put ( downloadUrl , url ) ;
String goFastUrl = pathSizeMap . get ( ESConstants . URL ) ;
srcMap . put ( ESConstants . GOFASTURL , goFastUrl ) ;
/ / 这个值使用来替换 三个 Path 的 imagePath , videoPath , filePath
path . add ( url ) ;
path . add ( goFastUrl ) ;
srcList . add ( srcMap ) ;
}
} else { / / 如果 path 中的 url 是 OK的 , 但是不确定 pathSize 和 srcPath 的时候 , 需要做下面的处理
/ / 因为 srcPath 中需要先添加下面两个字段值 , 因此需要先获取 。
String allDownloadUrl = domain + downloadUrl ;
/ / String allDownloadUrl = "http://172.18.1.113:8080" + downloadUrl ;
if ( downloadType = = 2 ) { / / 视频
resolution = data . getString ( ESConstants . RESOLUTION ) ;
videoTime = data . getString ( ESConstants . VIDEOTIME ) ;
}
if ( downloadType = = 1 ) {
resolution = DownLoadFile . getImageResolution ( allDownloadUrl ) ;
}
String size = DownLoadFile . getFileSize ( allDownloadUrl ) ;
/ / 组装 【 pathSize 】 字段
Map < String , String > pathSizeMap = new HashMap < > ( ) ;
pathSizeMap . put ( ESConstants . URL , downloadUrl ) ;
pathSizeMap . put ( ESConstants . SIZE , size ) ;
pathSizeMap . put ( ESConstants . RESOLUTION , resolution ) ;
pathSizeMap . put ( ESConstants . VIDEOTIME , videoTime ) ;
/ / 组装 【 path 】 字段
path . add ( downloadUrl ) ;
/ / 组装 【 srcPath 】 字段
srcMap . put ( ESConstants . GOFASTURL , downloadUrl ) ;
srcList . add ( srcMap ) ;
}
/ / 再这块组装出完整的 Path : List < String > , PathSize List < Map < String , String > > , srcPath List < Map < String , String > >
pathMap . put ( ESConstants . PATHSIZELIST , pathSizeList ) ;
if ( srcxxxList . size ( ) > 0 ) {
pathMap . put ( ESConstants . SRCLIST , srcxxxList ) ;
} else {
pathMap . put ( ESConstants . SRCLIST , srcList ) ;
}
pathMap . put ( ESConstants . PATH , path ) ;
}
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
}
pathMap . put ( ESConstants . PATHSIZELIST , pathSizeList ) ;
pathMap . put ( "srcMap" , srcMap ) ;
pathMap . put ( ESConstants . PATH , path ) ;
return pathMap ;
/ / return pathSizeList ;
}
private QueryBuilder getQueryBuilder ( Long startTime , Long endTime ,
@ -533,28 +621,23 @@ public class QueryService {
BoolQueryBuilder boolQueryBuilder = QueryBuilders . boolQuery ( ) ;
try {
/ / 当拉取次数大于1的 , 还需要限制 采集时间 , 采集时间不早于今天 。
boolean boo = true ;
/ / if ( cacheNum > 1 ) {
/ / QueryBuilder pubTimeRange = buildRangeQueryBuilder (
/ / ESConstants . CREATETIME , startTime - 2 * ONE_MINUTE , endTime , boo , boo ) ;
/ / boolQueryBuilder . must ( pubTimeRange ) ;
/ / }
/ / / / 筛选发表时间
/ / 筛选发表时间 电商的主贴没有发表时间 , 因此不需要筛选
if ( ! siteType . equals ( ESConstants . DOCTYPEITEM ) ) {
QueryBuilder pubTimeRange = buildRangeQueryBuilder (
ESConstants . PUBTIME , startTime - 2 * ONE_MINUTE , endTime , boo , boo ) ;
ESConstants . PUBTIME , startTime - 2 * ONE_MINUTE , endTime ) ;
boolQueryBuilder . must ( pubTimeRange ) ;
}
/ / 筛选站点
/ / 筛选站点 因为天猫 、 淘宝的数据是交叉的 , 因此如果需要拉某一个站点的时候需将两个站点的数据都拉出来 。
if ( cid . equals ( ESConstants . TAOBAO ) | | cid . equals ( ESConstants . TMALL ) ) {
boolQueryBuilder . must ( QueryBuilders . termsQuery ( ESConstants . EN_SOURCE , ESConstants . TAOBAO , ESConstants . TMALL ) ) ;
} else {
boolQueryBuilder . must ( QueryBuilders . termQuery ( ESConstants . EN_SOURCE , cid ) ) ;
}
/ / 筛选标志位
/ / 筛选标志位 , 即采集任务的条件进行筛选
QueryBuilder crawlDataFlagBuilder = buildCrawlDataFlagBuilder ( cid , crawlDataFlag ) ;
/ / System . out . println ( "== " + crawlDataFlagBuilder ) ;
boolQueryBuilder . must ( crawlDataFlagBuilder ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
@ -562,33 +645,39 @@ public class QueryService {
return boolQueryBuilder ;
}
private QueryBuilder buildRangeQueryBuilder ( String field , Object startVal , Object endVal , Boolean isIncludeLower , Boolean isIncludeUpper ) {
/ * *
* 范围查询的语句组装
* /
private QueryBuilder buildRangeQueryBuilder ( String field , Object startVal , Object endVal ) {
boolean boo = true ;
return rangeQuery ( field )
. from ( startVal )
. to ( endVal )
. includeLower ( isIncludeLower )
. includeUpper ( isIncludeUpper ) ;
. includeLower ( boo )
. includeUpper ( boo ) ;
}
private QueryBuilder buildCrawlDataFlagBuilder ( String cid , String crawlDataFlag ) {
QueryBuilder queryBuilder = null ;
try {
/ / TermQueryBuilder queryCidBuilders = QueryBuilders . termQuery ( "enSource" , cid ) ;
/ / 直接匹配有同样 crawlDataFlag 的数据进行拉取 。
TermQueryBuilder queryCrawlDataFlagBuilder = QueryBuilders . termQuery ( ESConstants . CRAWLDATAFLAG , crawlDataFlag ) ;
/ / 如果是关键词任务 , 就不拉电商的历史数据拉 , 匹配出来的东西乱起八糟的 ! ! 但是别的网站还是需要用关键词进行拉取的 。
if ( crawlDataFlag . contains ( "keyword:" ) ) {
String keyword = crawlDataFlag . split ( "keyword:" ) [ 1 ] ;
System . out . println ( "[buildCrawlDataFlagBuilder] keyword --- " + keyword ) ; / / 关键词的话需要去 title 和 content 匹配一下
MatchPhraseQueryBuilder titleQuery = QueryBuilders . matchPhraseQuery ( ESConstants . TITLE , keyword ) . slop ( 0 ) ;
BoolQueryBuilder titleFilterQuery = QueryBuilders . boolQuery ( ) . filter ( titleQuery ) ;
/ / 关键词匹配的时候 , 如果是电商的 , 就只匹配商品名称 。 如果是其他的 , 就匹配 标题和内容
if ( cid . equals ( "taobao" ) | | cid . equals ( "tmall" ) | | cid . equals ( "ejingdong" ) | | cid . equals ( "suning" ) ) {
queryBuilder = QueryBuilders . boolQuery ( ) . should ( titleFilterQuery ) . should ( queryCrawlDataFlagBuilder ) ;
} else {
MatchPhraseQueryBuilder contentQuery = QueryBuilders . matchPhraseQuery ( ESConstants . CONTENT , keyword ) . slop ( 0 ) ;
BoolQueryBuilder contentFilterQuery = QueryBuilders . boolQuery ( ) . filter ( contentQuery ) ;
queryBuilder = QueryBuilders . boolQuery ( ) . should ( titleFilterQuery ) . should ( contentFilterQuery ) . should ( queryCrawlDataFlagBuilder ) ;
}
/ / / / 关键词匹配的时候 , 如果是电商的 , 就只匹配商品名称 。 如果是其他的 , 就匹配 标题和内容
/ / if ( cid . equals ( "taobao" ) | | cid . equals ( "tmall" ) | | cid . equals ( "ejingdong" ) | | cid . equals ( "suning" ) ) {
/ / queryBuilder = QueryBuilders . boolQuery ( ) . should ( titleFilterQuery ) . should ( queryCrawlDataFlagBuilder ) ;
/ / } else {
MatchPhraseQueryBuilder contentQuery = QueryBuilders . matchPhraseQuery ( ESConstants . CONTENT , keyword ) . slop ( 0 ) ;
BoolQueryBuilder contentFilterQuery = QueryBuilders . boolQuery ( ) . filter ( contentQuery ) ;
queryBuilder = QueryBuilders . boolQuery ( ) . should ( titleFilterQuery ) . should ( contentFilterQuery ) . should ( queryCrawlDataFlagBuilder ) ;
/ / }
}
if ( crawlDataFlag . contains ( "url:" ) ) {
String url = crawlDataFlag . split ( "url:" ) [ 1 ] ;
System . out . println ( "[buildCrawlDataFlagBuilder] url --- " + url ) ; / / url 的话直接匹配 url 字段
@ -596,12 +685,14 @@ public class QueryService {
/ / QueryBuilder queryBuilder1 = QueryBuilders . boolQuery ( ) . must ( queryUrlBuilders ) ;
queryBuilder = QueryBuilders . boolQuery ( ) . should ( queryCrawlDataFlagBuilder ) . should ( queryUrlBuilders ) ;
}
if ( crawlDataFlag . contains ( "account:" ) ) {
String account = crawlDataFlag . split ( "account:" ) [ 1 ] ;
System . out . println ( "[buildCrawlDataFlagBuilder] account --- " + account ) ;
TermQueryBuilder queryAccountBuilders = QueryBuilders . termQuery ( ESConstants . USER_URL , account ) ;
queryBuilder = QueryBuilders . boolQuery ( ) . should ( queryAccountBuilders ) . should ( queryCrawlDataFlagBuilder ) ;
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
}
@ -1506,6 +1597,31 @@ public class QueryService {
/ / / / return params ;
/ / / / }
/ / private Map < String , String > getSrcUrlMap ( String picture ) {
/ / Map < String , String > srcurlMap = new HashMap < > ( ) ;
/ / srcurlMap . put ( ESConstants . GOFASTURL , "" ) ;
/ / srcurlMap . put ( ESConstants . ORIGINALURL , picture . toString ( ) ) ;
/ / return srcurlMap ;
/ / }
/ /
/ /
/ / private List < Map < String , String > > getSrcPath ( List < Map < String , String > > forwardUrl , Map < String , String > srcAndGofastUrlMap ) {
/ / List < Map < String , String > > srcPathList = new ArrayList < > ( ) ;
/ / for ( Map < String , String > urlMap : forwardUrl ) {
/ / if ( null ! = urlMap ) {
/ / Map < String , String > srcurlMap = new HashMap < > ( ) ;
/ / if ( urlMap . containsKey ( ESConstants . GOFASTURL ) & & null ! = urlMap . get ( ESConstants . GOFASTURL ) ) {
/ / srcurlMap . put ( ESConstants . GOFASTURL , srcAndGofastUrlMap . get ( urlMap . get ( ESConstants . GOFASTURL ) ) ) ;
/ / } else {
/ / srcurlMap . put ( ESConstants . GOFASTURL , "" ) ;
/ / }
/ / srcurlMap . put ( ESConstants . ORIGINALURL , urlMap . get ( ESConstants . ORIGINALURL ) ) ;
/ / srcPathList . add ( srcurlMap ) ;
/ / }
/ / }
/ / return srcPathList ;
/ / }
/ / public static void main ( String [ ] args ) {
/ / long current = System . currentTimeMillis ( ) ; / / 当前时间毫秒数
@ -1515,4 +1631,33 @@ public class QueryService {
/ / long startTime = 1605715200000L ;
/ / System . out . println ( startTime - 2 * ONE_MINUTE ) ;
/ / }
/ / public static void main ( String [ ] args ) {
/ / QueryService q = new QueryService ( ) ;
/ / String list = ReadLine . readFile ( "E:\\work\\workspace\\elastiManager\\elastiManager\\data/test.txt" ) ;
/ / JSONObject j = q . downloadAndChangePath ( JSONObject . parseObject ( list ) ) ;
/ / System . out . println ( j ) ;
/ / / / String path = "https://si.pdeepmatrix.com/group3/default/20210831/05/24/2/31_file" ;
/ / / / double size ;
/ / / / URL url = null ;
/ / / / try {
/ / / / url = new URL ( path ) ;
/ / / / URLConnection conn = url . openConnection ( ) ;
/ / / / size = conn . getContentLength ( ) ;
/ / / / if ( size < 0 )
/ / / / System . out . println ( "无法获取文件大小。" ) ;
/ / / / else
/ / / / System . out . println ( "文件大小为:" + size / 1024 + " bytes" ) ;
/ / / / conn . getInputStream ( ) . close ( ) ;
/ / / / } catch ( Exception e ) {
/ / / / e . printStackTrace ( ) ;
/ / / / }
/ /
/ / }
}