话题水军识别应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
7.0 KiB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. import threading
  4. from selenium import webdriver
  5. import json
  6. from urllib.parse import urljoin
  7. from kakfa_util import KafkaConsume
  8. from kakfa_util import kafkaProduce
  9. from logUtil import get_logger
  10. from Go_fastDfs import uploadFile
  11. import traceback
  12. import queue
  13. import configparser
  14. import os, sys
  15. import re
  16. logger = get_logger("./logs/crawlWebsrcCode.log")
  17. #加载配置文件
  18. configFile = './config.ini'
  19. # 创建配置文件对象
  20. con = configparser.ConfigParser()
  21. # 读取文件
  22. con.read(configFile, encoding='utf-8')
  23. kafkaConfig = dict(con.items('kafka'))#kafka配置信息
  24. goFastdfsConfig = dict(con.items('goFastdfs'))#goFastdfs配置信息
  25. class Spider(object):
  26. def __init__(self,url):
  27. self.chromeOptions = self.get_profile()
  28. self.browser = self.get_browser()
  29. self.url = url
  30. def get_profile(self):
  31. chromeOptions = webdriver.ChromeOptions()
  32. chromeOptions.add_argument('--headless') # 谷歌无头模式
  33. chromeOptions.add_argument('--disable-gpu') # 禁用显卡
  34. # chromeOptions.add_argument('window-size=1280,800') # 指定浏览器分辨率
  35. chromeOptions.add_argument("--no-sandbox")
  36. return chromeOptions
  37. def get_browser(self):
  38. browser = webdriver.Chrome("D:\\工作使用\\zhaoshang\\chromedriver.exe",chrome_options=self.chromeOptions)
  39. return browser
  40. def _get_page(self,path):
  41. '''
  42. :param path:
  43. :return:
  44. '''
  45. self.browser.get(self.url)
  46. time.sleep(5)
  47. logger.info("休眠结束")
  48. # 向下偏移了10000个像素,到达底部。
  49. scrollTop = 10000
  50. for num in range(1,10):
  51. js = "var q=document.documentElement.scrollTop={}".format(scrollTop*num)
  52. logger.info("第{}次滚动".format(num))
  53. self.browser.execute_script(js)
  54. time.sleep(5)
  55. # 执行 Chome 开发工具命令,得到mhtml内容
  56. res = self.browser.execute_cdp_cmd('Page.captureSnapshot', {})
  57. #获取文章标题
  58. title = '无标题'
  59. try:
  60. title = self.browser.find_element_by_css_selector("title").get_attribute("textContent")
  61. except Exception as e:
  62. logger.error('获取标题异常----')
  63. traceback.print_exc()
  64. pathName = '{}{}.mhtml'.format(path,title)
  65. with open(pathName, 'w',newline='') as f:
  66. f.write(res['data'])
  67. return pathName,title
  68. if __name__ == '__main__':
  69. #初始化任务队列
  70. task_queue = queue.Queue()
  71. #跟读kafka线程
  72. logger.info("开启读取kafka线程---")
  73. t = threading.Thread(target=KafkaConsume, name='LoopThread',args=(kafkaConfig['read_topic'], kafkaConfig['address'], kafkaConfig['group_id'], task_queue,logger))
  74. t.daemon = True
  75. t.start()
  76. #获取任务执行页面原格式保留
  77. while True:
  78. try:
  79. if task_queue.qsize() >0:
  80. taskStr = task_queue.get()
  81. logger.info('当前任务:{}'.format(taskStr))
  82. task = json.loads(taskStr)
  83. p1 = u'(https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]'
  84. pattern1 = re.compile(p1)
  85. matcher1 = re.search(p1, task['url'])
  86. if matcher1:
  87. l = Spider(task['url'])
  88. pathName,title = l._get_page(goFastdfsConfig['path'])
  89. l.browser.quit()
  90. #gofast 上传,写入kafka
  91. if '404 Not Found' in title:
  92. logger.error('页面404,无效')
  93. resultData = {
  94. 'code': 500,
  95. 'id': task['id'],
  96. 'message': '页面404'
  97. }
  98. kafkaProduce(kafkaConfig['data_topics'],
  99. json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
  100. kafkaConfig['address'])
  101. time.sleep(2)
  102. continue
  103. try:
  104. uploadStr = uploadFile('{}upload'.format(goFastdfsConfig['uploadaddress']),pathName,logger)
  105. uploadJson = json.loads(uploadStr)
  106. except Exception as e:
  107. logger.error('文件上传异常----')
  108. traceback.print_exc()
  109. resultData = {
  110. 'code': 500,
  111. 'id': task['id'],
  112. 'message': '文件上传失败'
  113. }
  114. kafkaProduce(kafkaConfig['data_topics'],
  115. json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
  116. kafkaConfig['address'])
  117. time.sleep(2)
  118. continue
  119. resultData = {
  120. 'code':200,
  121. 'id':task['id'],
  122. 'url':goFastdfsConfig['downloadaddress']+uploadJson['path'],
  123. 'title':title,
  124. 'delMd5':uploadJson['md5'],
  125. 'uploadTime':uploadJson['mtime'],
  126. 'message':'成功'
  127. }
  128. kafkaProduce(kafkaConfig['data_topics'],json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),kafkaConfig['address'])
  129. logger.info('数据写入成功')
  130. #删除文件
  131. if (os.path.exists(pathName)):
  132. os.remove(pathName)
  133. logger.info('清除文件:{}'.format(pathName))
  134. else:
  135. logger.info('要删除的文件不存在:{}'.format(pathName))
  136. else:
  137. logger.error('非正确url:'.format(task['url']))
  138. resultData = {
  139. 'code': 500,
  140. 'id': task['id'],
  141. 'message': '非正确url'
  142. }
  143. kafkaProduce(kafkaConfig['data_topics'],
  144. json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
  145. kafkaConfig['address'])
  146. time.sleep(2)
  147. continue
  148. else:
  149. logger.info("暂无任务,进入休眠--")
  150. time.sleep(10)
  151. except Exception as e:
  152. logger.error('未知异常----')
  153. traceback.print_exc()
  154. resultData = {
  155. 'code': 500,
  156. 'id': task['id'],
  157. 'message': '未知异常'
  158. }
  159. kafkaProduce(kafkaConfig['data_topics'],
  160. json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),
  161. kafkaConfig['address'])
  162. time.sleep(2)