# -*- coding: utf-8 -*- import time import threading from selenium import webdriver import json from urllib.parse import urljoin from kakfa_util import KafkaConsume from kakfa_util import kafkaProduce from logUtil import get_logger from Go_fastDfs import uploadFile import traceback import queue import configparser import os, sys import re logger = get_logger("./logs/crawlWebsrcCode.log") #加载配置文件 configFile = './config.ini' # 创建配置文件对象 con = configparser.ConfigParser() # 读取文件 con.read(configFile, encoding='utf-8') kafkaConfig = dict(con.items('kafka'))#kafka配置信息 goFastdfsConfig = dict(con.items('goFastdfs'))#goFastdfs配置信息 class Spider(object): def __init__(self,url): self.chromeOptions = self.get_profile() self.browser = self.get_browser() self.url = url def get_profile(self): chromeOptions = webdriver.ChromeOptions() chromeOptions.add_argument('--headless') # 谷歌无头模式 chromeOptions.add_argument('--disable-gpu') # 禁用显卡 # chromeOptions.add_argument('window-size=1280,800') # 指定浏览器分辨率 chromeOptions.add_argument("--no-sandbox") return chromeOptions def get_browser(self): browser = webdriver.Chrome("D:\\工作使用\\zhaoshang\\chromedriver.exe",chrome_options=self.chromeOptions) return browser def _get_page(self,path): ''' 获取页面原格式,写入文件并返回路径 :param path: :return: ''' self.browser.get(self.url) time.sleep(5) logger.info("休眠结束") # 向下偏移了10000个像素,到达底部。 scrollTop = 10000 for num in range(1,10): js = "var q=document.documentElement.scrollTop={}".format(scrollTop*num) logger.info("第{}次滚动".format(num)) self.browser.execute_script(js) time.sleep(5) # 执行 Chome 开发工具命令,得到mhtml内容 res = self.browser.execute_cdp_cmd('Page.captureSnapshot', {}) #获取文章标题 title = '无标题' try: title = self.browser.find_element_by_css_selector("title").get_attribute("textContent") except Exception as e: logger.error('获取标题异常----') traceback.print_exc() pathName = '{}{}.mhtml'.format(path,title) with open(pathName, 'w',newline='') as f: f.write(res['data']) return pathName,title if __name__ == '__main__': #初始化任务队列 task_queue = queue.Queue() #跟读kafka线程 logger.info("开启读取kafka线程---") t = threading.Thread(target=KafkaConsume, name='LoopThread',args=(kafkaConfig['read_topic'], kafkaConfig['address'], kafkaConfig['group_id'], task_queue,logger)) t.daemon = True t.start() #获取任务执行页面原格式保留 while True: try: if task_queue.qsize() >0: taskStr = task_queue.get() logger.info('当前任务:{}'.format(taskStr)) task = json.loads(taskStr) p1 = u'(https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]' pattern1 = re.compile(p1) matcher1 = re.search(p1, task['url']) if matcher1: l = Spider(task['url']) pathName,title = l._get_page(goFastdfsConfig['path']) l.browser.quit() #gofast 上传,写入kafka if '404 Not Found' in title: logger.error('页面404,无效') resultData = { 'code': 500, 'id': task['id'], 'message': '页面404' } kafkaProduce(kafkaConfig['data_topics'], json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), kafkaConfig['address']) time.sleep(2) continue try: uploadStr = uploadFile('{}upload'.format(goFastdfsConfig['uploadaddress']),pathName,logger) uploadJson = json.loads(uploadStr) except Exception as e: logger.error('文件上传异常----') traceback.print_exc() resultData = { 'code': 500, 'id': task['id'], 'message': '文件上传失败' } kafkaProduce(kafkaConfig['data_topics'], json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), kafkaConfig['address']) time.sleep(2) continue resultData = { 'code':200, 'id':task['id'], 'url':goFastdfsConfig['downloadaddress']+uploadJson['path'], 'title':title, 'delMd5':uploadJson['md5'], 'uploadTime':uploadJson['mtime'], 'message':'成功' } kafkaProduce(kafkaConfig['data_topics'],json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),kafkaConfig['address']) logger.info('数据写入成功') #删除文件 if (os.path.exists(pathName)): os.remove(pathName) logger.info('清除文件:{}'.format(pathName)) else: logger.info('要删除的文件不存在:{}'.format(pathName)) else: logger.error('非正确url:'.format(task['url'])) resultData = { 'code': 500, 'id': task['id'], 'message': '非正确url' } kafkaProduce(kafkaConfig['data_topics'], json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), kafkaConfig['address']) time.sleep(2) continue else: logger.info("暂无任务,进入休眠--") time.sleep(10) except Exception as e: logger.error('未知异常----') traceback.print_exc() resultData = { 'code': 500, 'id': task['id'], 'message': '未知异常' } kafkaProduce(kafkaConfig['data_topics'], json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), kafkaConfig['address']) time.sleep(2)