话题水军识别应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

443 lines
20 KiB

  1. #coding:utf8
  2. import os, sys
  3. import io
  4. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
  5. cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd()
  6. par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir))
  7. sys.path.append(cur_dir)
  8. sys.path.append(par_dir)
  9. import json
  10. from django.http import HttpResponse
  11. from text_analysis.tools import to_kafka,tool
  12. from django.views.decorators.csrf import csrf_exempt
  13. from log_util.set_logger import set_logger
  14. logging=set_logger('logs/results.log')
  15. import traceback
  16. import queue
  17. import requests
  18. import time
  19. from datetime import datetime, timedelta
  20. from text_analysis.cusException import userFile_Exception,postFile_Exception,replyFile_Exception
  21. from text_analysis.tools.tool import parse_data
  22. import os
  23. import joblib
  24. from text_analysis.tools.db_pool import get_conn_pool
  25. from kazoo.client import KazooClient
  26. from kazoo.protocol.states import EventType
  27. # 任务队列
  28. import queue
  29. task_queue = queue.PriorityQueue()
  30. stop_dict={}
  31. from text_analysis.read_config import load_config
  32. config=load_config()
  33. #专题图信息
  34. replyGraph={}
  35. @csrf_exempt
  36. def robotIdentificationTopic(request):
  37. if request.method == 'POST':
  38. try:
  39. raw_data = json.loads(request.body)
  40. if "trace" in raw_data.keys() and raw_data["trace"]==True:
  41. task_queue.put((-1, time.time(),raw_data))
  42. else:
  43. task_queue.put((1, time.time(),raw_data))
  44. return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False))
  45. except:
  46. logging.error(traceback.format_exc())
  47. return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False))
  48. else:
  49. return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False))
  50. def predictTopic(user_file_result,post_file_result,task,dbConfig,taskId):
  51. try:
  52. # 识别结果返回值
  53. recognition_code = "0"
  54. # 用户数据
  55. res = {"successCode": "1", "errorLog": "", "results": {}}
  56. # 获取历史数据源
  57. all_result = task['data']
  58. user_data = []
  59. data={}
  60. #返回值需要的三个字段
  61. accountId=""
  62. nickName=""
  63. accountName=""
  64. # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"}
  65. if user_file_result:
  66. data['user_file'] = user_file_result
  67. logging.info('用户数据:{}'.format(data['user_file']))
  68. accountId = data["user_file"]["accountId"]
  69. nickName = data["user_file"]["nickName"]
  70. accountName = data["user_file"]["accountName"]
  71. else:
  72. data['user_file'] ={}
  73. raise userFile_Exception
  74. if post_file_result:
  75. data['post_file'] = post_file_result
  76. logging.info('帖子数据:{}'.format(data['post_file']))
  77. else:
  78. data['post_file'] = {}
  79. raise postFile_Exception
  80. try:
  81. user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1
  82. except:
  83. user_data_otherInfo_1 = 0
  84. try:
  85. user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1
  86. except:
  87. user_data_nickName_2 = 0
  88. try:
  89. user_data_likeCount_4 = int(data["user_file"]["likeCount"])
  90. except:
  91. user_data_likeCount_4 = 0
  92. try:
  93. user_data_postCount_5 = int(data["user_file"]["postCount"])
  94. except:
  95. user_data_postCount_5 = 0
  96. try:
  97. user_data_authentication_6 = int(data["user_file"]["authentication"])
  98. except:
  99. user_data_authentication_6 = 0
  100. user_data.extend(
  101. [user_data_otherInfo_1, user_data_nickName_2, user_data_likeCount_4,user_data_postCount_5, user_data_authentication_6])
  102. logging.info("用户数据处理完毕!-{}".format(user_data))
  103. # 帖子数据
  104. post_data = []
  105. if data["post_file"]=={}:
  106. post_data=[0,0,0,0,0,0,0,0]
  107. else:
  108. try:
  109. post_data_LikeCount_1 = int(data["post_file"]["LikeCount"])
  110. except:
  111. post_data_LikeCount_1 = 0
  112. try:
  113. post_data_ShareCount_2 = int(data["post_file"]["ShareCount"])
  114. except:
  115. post_data_ShareCount_2 = 0
  116. try:
  117. post_data_emotionCount_3 = int(data["post_file"]["emotionCount"])
  118. except:
  119. post_data_emotionCount_3 = 0
  120. try:
  121. post_data_CommentsCount_4 = int(data["post_file"]["CommentsCount"])
  122. except:
  123. post_data_CommentsCount_4 = 0
  124. try:
  125. post_data_length_5 = int(data["post_file"]["length"])
  126. except:
  127. post_data_length_5 = 0
  128. try:
  129. post_data_tags_6 = int(data["post_file"]["tags"])
  130. except:
  131. post_data_tags_6 = 0
  132. try:
  133. post_data_https_7 = int(data["post_file"]["https"])
  134. except:
  135. post_data_https_7 = 0
  136. try:
  137. post_data_diffdate_8 = int(data["post_file"]["diffdate"])
  138. except:
  139. post_data_diffdate_8 = 0
  140. post_data.extend(
  141. [post_data_LikeCount_1, post_data_ShareCount_2, post_data_emotionCount_3, post_data_CommentsCount_4,
  142. post_data_length_5, post_data_tags_6, post_data_https_7, post_data_diffdate_8])
  143. logging.info("帖子数据处理完毕!-{}".format(post_data))
  144. #关系数据
  145. reply_data_1 = [0, 0, 0, 0, 0]
  146. reply_data_2 = [0, 0]
  147. try:
  148. #先判断内存中是否有该专题图信息
  149. topicID=taskId
  150. if topicID not in list(replyGraph.keys()):
  151. reply_file=tool.mysqlData(dbConfig,topicID,logging)
  152. if reply_file:
  153. graph=tool.get_replyData(reply_file)
  154. replyGraph[topicID]=graph
  155. else:
  156. raise replyFile_Exception
  157. else:
  158. graph=replyGraph[topicID]
  159. userId=data["user_file"]["accountId"]
  160. if userId in list(graph.keys()):
  161. closeness_centrality=graph["userId"]["closeness_centrality"]
  162. pagerank=graph["userId"]["pagerank"]
  163. clustering=graph["userId"]["clustering"]
  164. in_degree=graph["userId"]["in_degree"]
  165. out_degree=graph["userId"]["out_degree"]
  166. reply_data_1=[closeness_centrality,pagerank,clustering,in_degree,out_degree]
  167. user_flag_infl=graph["userId"]["user_flag_infl"]
  168. user_flag_act=graph["userId"]["user_flag_act"]
  169. reply_data_2=[user_flag_infl,user_flag_act]
  170. replyGraph[topicID]["last_operation_time"]=datetime.now()
  171. except:
  172. logging.info("专题关系数据mysql获取失败!")
  173. logging.info(traceback.format_exc())
  174. logging.info("关系数据处理完毕!{}-{}".format(reply_data_1,reply_data_2))
  175. features = [user_data + reply_data_1 + post_data + reply_data_2]
  176. bot_user = joblib.load(cur_dir+"/model/bot_topic.pkl") # 加载训练好的模型
  177. result = bot_user.predict(features)
  178. recognition_code = str(result[0])
  179. res["results"]=str(result[0])
  180. results = {}
  181. # 用户id
  182. results['authorId'] = accountId
  183. # 用户昵称
  184. results['nickName'] = nickName
  185. # 用户账号
  186. results['accountName'] = accountName
  187. # 结束标识
  188. res['isLast'] = True
  189. # 数据类型 --目前只提供给图谱使用
  190. results['pageType'] = 'userAuthenPage'
  191. if recognition_code == '0':
  192. results['recognitionResult'] = '非机器人'
  193. results['recognitionCode'] = recognition_code
  194. elif recognition_code == '1':
  195. results['recognitionResult'] = '机器人'
  196. results['recognitionCode'] = recognition_code
  197. else:
  198. results['recognitionResult'] = '未知识别结果'
  199. results['recognitionCode'] = recognition_code
  200. results["isLast"]=1
  201. res['results'] = json.dumps(results)
  202. res["status"]=1
  203. res["message"]="成功"
  204. task["result"] = res
  205. logging.info("增加预测数据-{}".format(task))
  206. to_kafka.send_kafka(task, logging)
  207. except userFile_Exception:
  208. res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}}
  209. results={}
  210. results['authorId'] = ""
  211. results['nickName'] = ""
  212. results['accountName'] = ""
  213. results['recognitionResult'] = '用户数据为空'
  214. results["isLast"]=1
  215. res['results'] = json.dumps(results)
  216. res["status"]=2
  217. res["message"]="用户数据为空"
  218. task["result"] = res
  219. logging.info("该条请求用户数据为空-{}".format(task))
  220. to_kafka.send_kafka(task, logging)
  221. except postFile_Exception:
  222. res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}}
  223. results={}
  224. results['authorId'] = accountId
  225. results['nickName'] = nickName
  226. results['accountName'] = accountName
  227. results['recognitionResult'] = '帖子数据为空'
  228. results["isLast"]=1
  229. res['results'] = json.dumps(results)
  230. res["status"]=2
  231. res["message"]="帖子数据为空"
  232. task["result"] = res
  233. logging.info("该条请求帖子数据为空-{}".format(task))
  234. to_kafka.send_kafka(task, logging)
  235. except replyFile_Exception:
  236. res = {"successCode": "0", "errorLog": "发帖和评论关系数据为空!", "results": {}}
  237. results={}
  238. results['authorId'] = accountId
  239. results['nickName'] = nickName
  240. results['accountName'] = accountName
  241. results['recognitionResult'] = '发帖和评论关系数据为空'
  242. results["isLast"]=1
  243. res['results'] = json.dumps(results)
  244. res["status"]=2
  245. res["message"]="发帖和评论关系数据为空"
  246. task["result"] = res
  247. logging.info("该条请求发帖和评论关系数据为空-{}".format(task))
  248. to_kafka.send_kafka(task, logging)
  249. except:
  250. res = {"successCode": "0", "errorLog": "", "results": {}}
  251. results = {}
  252. results['authorId'] = accountId
  253. results['nickName'] = nickName
  254. results['accountName'] = accountName
  255. results['recognitionResult'] = ""
  256. results["isLast"]=1
  257. res['results'] = json.dumps(results)
  258. res["status"]=2
  259. res["message"]="异常"
  260. task["result"] = res
  261. task["result"]["errorLog"] = traceback.format_exc()
  262. logging.info(traceback.format_exc())
  263. to_kafka.send_kafka(task, logging)
  264. def data_structure():
  265. '''
  266. :param dbConfig:
  267. :return:
  268. '''
  269. dbConfig = dict(config.items('database'))
  270. # 获取数据库连接
  271. sqlhelper = get_conn_pool(dbConfig['host'], dbConfig['port'], dbConfig['username'], dbConfig['password'],dbConfig['db'])
  272. #用户任务结构体缓存
  273. user_tasks = {}
  274. while True:
  275. try:
  276. if task_queue.qsize()>0:
  277. p,t,task = task_queue.get(timeout=1)
  278. task_id=task["scenes_id"]
  279. task_version=task["version"]
  280. logging.info("当前version信息为:{}".format(stop_dict))
  281. if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]:
  282. logging.info("已暂停任务,数据过滤掉")
  283. continue
  284. input = task['input']
  285. account = input['account']
  286. post = input['post']
  287. reply = input['reply']
  288. #判断数据类型
  289. data = task['data']
  290. page_type = None
  291. taskId = None
  292. for data_str in data:
  293. try:
  294. app_data = json.loads(data[data_str])
  295. taskId = app_data['taskId']
  296. if "pageType" in app_data:
  297. page_type = app_data['pageType']
  298. break
  299. except:
  300. logging.error("正常判断,异常请忽略")
  301. if page_type == 'userInfoPage':
  302. #用户添加到缓存
  303. accountId = parse_data(task, account['accountId'])
  304. user_tasks[accountId] = task
  305. logging.info('成功添加用户缓存:{}'.format(accountId))
  306. #用户类型数据写入
  307. sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
  308. # 构造参数元组
  309. values = (
  310. parse_data(task, account['taskId']),
  311. parse_data(task, account['accountId']),
  312. parse_data(task, account['accountName']),
  313. parse_data(task, account['nickName']),
  314. parse_data(task, account['fansCount']),
  315. parse_data(task, account['likeCount']),
  316. parse_data(task, account['postCount']),
  317. parse_data(task, account['otherInfo']),
  318. parse_data(task, account['authentication'])
  319. )
  320. sqlhelper.insert(sql,values)
  321. elif page_type == 'storyDetailPage':
  322. #帖子类型数据写入
  323. sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
  324. # 构造参数元组
  325. values = (
  326. parse_data(task, post['taskId']),
  327. parse_data(task, post['postId']),
  328. parse_data(task, post['accountId']),
  329. parse_data(task, post['accountName']),
  330. parse_data(task, post['likeCount']),
  331. parse_data(task, post['emotionCount']),
  332. parse_data(task, post['commentsCount']),
  333. parse_data(task, post['shareCount']),
  334. parse_data(task, post['content']),
  335. parse_data(task, post['pubTime']),
  336. parse_data(task, post['crawlTime'])
  337. )
  338. sqlhelper.insert(sql,values)
  339. elif page_type == 'socialComment':
  340. #评论类型数据写入
  341. sql = "INSERT INTO `reply`(`taskId`, `ReviewerAccountId`, `ReviewerAccountName`, `postId`, `ShareCount`, `LikeCount`, `CommentCount`, `CommentTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
  342. # 构造参数元组
  343. values = (
  344. parse_data(task, reply['taskId']),
  345. parse_data(task, reply['reviewerAccountId']),
  346. parse_data(task, reply['reviewerAccountName']),
  347. parse_data(task, reply['postId']),
  348. parse_data(task, reply['shareCount']),
  349. parse_data(task, reply['likeCount']),
  350. parse_data(task, reply['commentsCount']),
  351. parse_data(task, reply['commentTime'])
  352. )
  353. sqlhelper.insert(sql,values)
  354. #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法
  355. if 'isLast'in data and data['isLast']:
  356. #获取用户相关的数据
  357. sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId)
  358. user_file_result = sqlhelper.queryAll(sql)
  359. if user_file_result:
  360. for user in user_file_result:
  361. try:
  362. # 获取帖子相关的数据
  363. sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post WHERE taskId = '{taskId}' and accountId = '{accountId}'".format(taskId=taskId,accountId=user['accountId'])
  364. post_file_result = sqlhelper.queryOne(sql)
  365. send_task = user_tasks[user['accountId']]
  366. predictTopic(user,post_file_result,send_task,dbConfig,taskId)
  367. except Exception as e:
  368. traceback.print_exc()
  369. logging.error("用户id:{}".format(user['accountId']))
  370. logging.error("用户缓存加载失败:{}".format(send_task))
  371. else:
  372. #清空用户任务缓存
  373. user_tasks.clear()
  374. else:
  375. # 暂无任务,进入休眠
  376. time.sleep(10)
  377. except Exception as e:
  378. traceback.print_exc()
  379. def replyGraphThread():
  380. '''
  381. 2访
  382. :return:
  383. '''
  384. while True:
  385. try:
  386. if replyGraph!={}:
  387. # 获取当前时间
  388. current_time = datetime.now()
  389. for topicID in list(replyGraph.keys()):
  390. # 计算最后一次操作的时间与当前时间的差值
  391. time_difference = current_time - replyGraph[topicID]['last_operation_time']
  392. # 如果差值大于等于120分钟,则删除该话题图信息
  393. if time_difference >= timedelta(minutes=120):
  394. del replyGraph[topicID]
  395. except:
  396. logging.info(traceback.format_exc())
  397. finally:
  398. time.sleep(1800)
  399. def zk_monitoring():
  400. try:
  401. #线上环境
  402. zk = KazooClient(hosts=config['zookeeper']['zkhost'])
  403. #测试环境
  404. # zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181')
  405. zk.start()
  406. # 设置监听器
  407. @zk.DataWatch("/analyze")
  408. def watch_node(data, stat, event):
  409. if event is not None and event.type == EventType.CHANGED:
  410. data, stat = zk.get("/analyze")
  411. logging.info("执行删除操作:{}".format(data))
  412. try:
  413. d = json.loads(data)
  414. id = d["scenes_id"]
  415. stop_dict[id] = {}
  416. stop_dict[id]["version"] = d["version"]
  417. stop_dict[id]["operation"] = d["operation"]
  418. except:
  419. pass
  420. # 保持程序运行以监听节点变化
  421. try:
  422. while True:
  423. time.sleep(1)
  424. except:
  425. logging.info("Stopping...")
  426. # 关闭连接
  427. zk.stop()
  428. zk.close()
  429. except:
  430. logging.error(traceback.format_exc())