You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
171 lines
7.0 KiB
171 lines
7.0 KiB
# -*- coding: utf-8 -*-
|
|
import time
|
|
import threading
|
|
from selenium import webdriver
|
|
import json
|
|
from urllib.parse import urljoin
|
|
from kakfa_util import KafkaConsume
|
|
from kakfa_util import kafkaProduce
|
|
from logUtil import get_logger
|
|
from Go_fastDfs import uploadFile
|
|
import traceback
|
|
import queue
|
|
import configparser
|
|
import os, sys
|
|
import re
|
|
logger = get_logger("./logs/crawlWebsrcCode.log")
|
|
#加载配置文件
|
|
configFile = './config.ini'
|
|
# 创建配置文件对象
|
|
con = configparser.ConfigParser()
|
|
# 读取文件
|
|
con.read(configFile, encoding='utf-8')
|
|
kafkaConfig = dict(con.items('kafka'))#kafka配置信息
|
|
goFastdfsConfig = dict(con.items('goFastdfs'))#goFastdfs配置信息
|
|
class Spider(object):
|
|
def __init__(self,url):
|
|
self.chromeOptions = self.get_profile()
|
|
self.browser = self.get_browser()
|
|
self.url = url
|
|
def get_profile(self):
|
|
chromeOptions = webdriver.ChromeOptions()
|
|
chromeOptions.add_argument('--headless') # 谷歌无头模式
|
|
chromeOptions.add_argument('--disable-gpu') # 禁用显卡
|
|
# chromeOptions.add_argument('window-size=1280,800') # 指定浏览器分辨率
|
|
chromeOptions.add_argument("--no-sandbox")
|
|
return chromeOptions
|
|
|
|
def get_browser(self):
|
|
browser = webdriver.Chrome("D:\\工作使用\\zhaoshang\\chromedriver.exe",chrome_options=self.chromeOptions)
|
|
return browser
|
|
|
|
def _get_page(self,path):
|
|
'''
|
|
获取页面原格式,写入文件并返回路径
|
|
:param path:
|
|
:return:
|
|
'''
|
|
self.browser.get(self.url)
|
|
time.sleep(5)
|
|
logger.info("休眠结束")
|
|
# 向下偏移了10000个像素,到达底部。
|
|
scrollTop = 10000
|
|
for num in range(1,10):
|
|
js = "var q=document.documentElement.scrollTop={}".format(scrollTop*num)
|
|
logger.info("第{}次滚动".format(num))
|
|
self.browser.execute_script(js)
|
|
time.sleep(5)
|
|
# 执行 Chome 开发工具命令,得到mhtml内容
|
|
res = self.browser.execute_cdp_cmd('Page.captureSnapshot', {})
|
|
#获取文章标题
|
|
title = '无标题'
|
|
try:
|
|
title = self.browser.find_element_by_css_selector("title").get_attribute("textContent")
|
|
except Exception as e:
|
|
logger.error('获取标题异常----')
|
|
traceback.print_exc()
|
|
pathName = '{}{}.mhtml'.format(path,title)
|
|
with open(pathName, 'w',newline='') as f:
|
|
f.write(res['data'])
|
|
return pathName,title
|
|
if __name__ == '__main__':
|
|
#初始化任务队列
|
|
task_queue = queue.Queue()
|
|
#跟读kafka线程
|
|
logger.info("开启读取kafka线程---")
|
|
t = threading.Thread(target=KafkaConsume, name='LoopThread',args=(kafkaConfig['read_topic'], kafkaConfig['address'], kafkaConfig['group_id'], task_queue,logger))
|
|
t.daemon = True
|
|
t.start()
|
|
#获取任务执行页面原格式保留
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
try:
|
|
if task_queue.qsize() >0:
|
|
taskStr = task_queue.get()
|
|
logger.info('当前任务:{}'.format(taskStr))
|
|
task = json.loads(taskStr)
|
|
p1 = u'(https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]'
|
|
pattern1 = re.compile(p1)
|
|
matcher1 = re.search(p1, task['url'])
|
|
if matcher1:
|
|
l = Spider(task['url'])
|
|
pathName,title = l._get_page(goFastdfsConfig['path'])
|
|
l.browser.quit()
|
|
#gofast 上传,写入kafka
|
|
if '404 Not Found' in title:
|
|
logger.error('页面404,无效')
|
|
resultData = {
|
|
'code': 500,
|
|
'id': task['id'],
|
|
'message': '页面404'
|
|
}
|
|
kafkaProduce(kafkaConfig['data_topics'],
|
|
json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
|
|
kafkaConfig['address'])
|
|
time.sleep(2)
|
|
continue
|
|
try:
|
|
uploadStr = uploadFile('{}upload'.format(goFastdfsConfig['uploadaddress']),pathName,logger)
|
|
uploadJson = json.loads(uploadStr)
|
|
except Exception as e:
|
|
logger.error('文件上传异常----')
|
|
traceback.print_exc()
|
|
resultData = {
|
|
'code': 500,
|
|
'id': task['id'],
|
|
'message': '文件上传失败'
|
|
}
|
|
kafkaProduce(kafkaConfig['data_topics'],
|
|
json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
|
|
kafkaConfig['address'])
|
|
time.sleep(2)
|
|
continue
|
|
resultData = {
|
|
'code':200,
|
|
'id':task['id'],
|
|
'url':goFastdfsConfig['downloadaddress']+uploadJson['path'],
|
|
'title':title,
|
|
'delMd5':uploadJson['md5'],
|
|
'uploadTime':uploadJson['mtime'],
|
|
'message':'成功'
|
|
}
|
|
kafkaProduce(kafkaConfig['data_topics'],json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),kafkaConfig['address'])
|
|
logger.info('数据写入成功')
|
|
#删除文件
|
|
if (os.path.exists(pathName)):
|
|
os.remove(pathName)
|
|
logger.info('清除文件:{}'.format(pathName))
|
|
else:
|
|
logger.info('要删除的文件不存在:{}'.format(pathName))
|
|
else:
|
|
logger.error('非正确url:'.format(task['url']))
|
|
resultData = {
|
|
'code': 500,
|
|
'id': task['id'],
|
|
'message': '非正确url'
|
|
}
|
|
kafkaProduce(kafkaConfig['data_topics'],
|
|
json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
|
|
kafkaConfig['address'])
|
|
time.sleep(2)
|
|
continue
|
|
else:
|
|
logger.info("暂无任务,进入休眠--")
|
|
time.sleep(10)
|
|
except Exception as e:
|
|
logger.error('未知异常----')
|
|
traceback.print_exc()
|
|
resultData = {
|
|
'code': 500,
|
|
'id': task['id'],
|
|
'message': '未知异常'
|
|
}
|
|
kafkaProduce(kafkaConfig['data_topics'],
|
|
json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
|
|
kafkaConfig['address'])
|
|
time.sleep(2)
|
|
|