用户水军识别应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
17 KiB

  1. # coding:utf8
  2. import os, sys
  3. import io
  4. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
  5. cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd()
  6. par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir))
  7. sys.path.append(cur_dir)
  8. sys.path.append(par_dir)
  9. import json
  10. from django.http import HttpResponse
  11. from text_analysis.tools import to_kafka
  12. from django.views.decorators.csrf import csrf_exempt
  13. from log_util.set_logger import set_logger
  14. from text_analysis.tools.tool import parse_data
  15. logging = set_logger('logs/results.log')
  16. import traceback
  17. import time
  18. import joblib
  19. from text_analysis.cusException import userFile_Exception, postFile_Exception
  20. from kazoo.client import KazooClient
  21. from kazoo.protocol.states import EventType
  22. # 任务队列
  23. import queue
  24. task_queue = queue.PriorityQueue()
  25. stop_dict={}
  26. from text_analysis.read_config import load_config
  27. config=load_config()
  28. #mysql连接池
  29. from text_analysis.tools.db_pool import get_conn_pool
  30. @csrf_exempt
  31. def robotIdentification(request):
  32. if request.method == 'POST':
  33. try:
  34. raw_data = json.loads(request.body)
  35. if "trace" in raw_data.keys() and raw_data["trace"]==True:
  36. task_queue.put((-1, time.time(),raw_data))
  37. else:
  38. task_queue.put((1,time.time(), raw_data))
  39. return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False))
  40. except:
  41. logging.error(traceback.format_exc())
  42. return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False))
  43. else:
  44. return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False))
  45. def predict(user_file_result,post_file_result,task):
  46. try:
  47. # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung",
  48. # "fansCount": 308, "likeCount": 92707, "postCount": 14237,
  49. # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}",
  50. # "authentication": 0},
  51. # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1,
  52. # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}}
  53. # 用户数据
  54. res = {"successCode": "1", "errorLog": "", "results": {}}
  55. user_data = []
  56. # 返回值需要的三个字段
  57. accountId = ""
  58. nickName = ""
  59. accountName = ""
  60. data = {}
  61. if user_file_result:
  62. data['user_file'] = user_file_result
  63. logging.info('用户数据:{}'.format(data['user_file']))
  64. accountId = data["user_file"]["accountId"]
  65. nickName = data["user_file"]["nickName"]
  66. accountName = data["user_file"]["accountName"]
  67. else:
  68. data['user_file'] = {}
  69. raise userFile_Exception
  70. if post_file_result:
  71. data['post_file'] = post_file_result
  72. logging.info('帖子数据:{}'.format(data['post_file']))
  73. else:
  74. data['post_file'] = {}
  75. raise postFile_Exception
  76. # 识别结果返回值
  77. recognition_code = "0"
  78. try:
  79. user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1
  80. except:
  81. user_data_otherInfo_1 = 0
  82. try:
  83. user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1
  84. except:
  85. user_data_nickName_2 = 0
  86. try:
  87. user_data_fansCount_3 = int(data["user_file"]["fansCount"])
  88. except:
  89. user_data_fansCount_3 = 0
  90. try:
  91. user_data_likeCount_4 = int(data["user_file"]["likeCount"])
  92. except:
  93. user_data_likeCount_4 = 0
  94. try:
  95. user_data_postCount_5 = int(data["user_file"]["postCount"])
  96. except:
  97. user_data_postCount_5 = 0
  98. try:
  99. user_data_authentication_6 = int(data["user_file"]["authentication"])
  100. except:
  101. user_data_authentication_6 = 0
  102. user_data.extend(
  103. [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4,
  104. user_data_postCount_5, user_data_authentication_6])
  105. # 帖子数据
  106. if data["post_file"] == {}:
  107. recognition_code = "-1"
  108. else:
  109. post_data = []
  110. try:
  111. post_data_count_1 = int(data["post_file"]["count"])
  112. except:
  113. post_data_count_1 = 0
  114. try:
  115. post_data_LikeCount_2 = int(data["post_file"]["LikeCount"])
  116. except:
  117. post_data_LikeCount_2 = 0
  118. try:
  119. post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"])
  120. except:
  121. post_data_CommentsCount_3 = 0
  122. try:
  123. post_data_ShareCount_4 = int(data["post_file"]["ShareCount"])
  124. except:
  125. post_data_ShareCount_4 = 0
  126. try:
  127. post_data_length_5 = int(data["post_file"]["length"])
  128. except:
  129. post_data_length_5 = 0
  130. try:
  131. post_data_tags_6 = int(data["post_file"]["tags"])
  132. except:
  133. post_data_tags_6 = 0
  134. try:
  135. post_data_https_7 = int(data["post_file"]["https"])
  136. except:
  137. post_data_https_7 = 0
  138. try:
  139. post_data_at_8 = int(data["post_file"]["at"])
  140. except:
  141. post_data_at_8 = 0
  142. try:
  143. post_data_diffdate_9 = int(data["post_file"]["diffdate"])
  144. except:
  145. post_data_diffdate_9 = 0
  146. post_data.extend(
  147. [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4,
  148. post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9])
  149. features = [user_data + post_data]
  150. bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型
  151. result = bot_user.predict(features)
  152. recognition_code = str(result[0])
  153. # logging.info("预测模型结果为{}".format(result))
  154. results = {}
  155. # 用户id
  156. results['authorId'] = accountId
  157. # 用户昵称
  158. results['nickName'] = nickName
  159. # 用户账号
  160. results['accountName'] = accountName
  161. #结束标识
  162. res['isLast'] = True
  163. #数据类型 --目前只提供给图谱使用
  164. results['pageType'] = 'userAuthenPage'
  165. if recognition_code == '0':
  166. results['recognitionResult'] = '非机器人'
  167. results['recognitionCode'] = recognition_code
  168. elif recognition_code == '1':
  169. results['recognitionResult'] = '机器人'
  170. results['recognitionCode'] = recognition_code
  171. else:
  172. results['recognitionResult'] = '未知识别结果'
  173. results['recognitionCode'] = recognition_code
  174. results["isLast"]=1
  175. res['results'] = json.dumps(results)
  176. res["status"]=1
  177. res["message"]="成功"
  178. task["result"] = res
  179. logging.info("增加预测数据-{}".format(task))
  180. to_kafka.send_kafka(task, logging)
  181. except userFile_Exception:
  182. res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}}
  183. results = {}
  184. results['authorId'] = ""
  185. results['nickName'] = ""
  186. results['accountName'] = ""
  187. results['recognitionResult'] = '用户数据为空'
  188. results["isLast"]=1
  189. res['results'] = json.dumps(results)
  190. res["status"]=2
  191. res["message"]="用户数据为空"
  192. task["result"] = res
  193. logging.info("该条请求用户数据为空-{}".format(task))
  194. to_kafka.send_kafka(task, logging)
  195. except postFile_Exception:
  196. res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}}
  197. results = {}
  198. results['authorId'] = accountId
  199. results['nickName'] = nickName
  200. results['accountName'] = accountName
  201. results['recognitionResult'] = '帖子数据为空'
  202. results["isLast"]=1
  203. res['results'] = json.dumps(results)
  204. res["status"]=2
  205. res["message"]="帖子数据为空"
  206. task["result"] = res
  207. logging.info("该条请求帖子数据为空-{}".format(task))
  208. to_kafka.send_kafka(task, logging)
  209. except:
  210. res = {"successCode": "0", "errorLog": "", "results": {}}
  211. results = {}
  212. results['authorId'] = accountId
  213. results['nickName'] = nickName
  214. results['accountName'] = accountName
  215. results['recognitionResult'] = ""
  216. results["isLast"]=1
  217. res["results"] = json.dumps(results)
  218. res["status"]=2
  219. res["message"]="异常"
  220. task["result"] = res
  221. task["result"]["error"] = traceback.format_exc()
  222. logging.info(traceback.format_exc())
  223. to_kafka.send_kafka(task, logging)
  224. def data_structure(dbConfig):
  225. '''
  226. '''
  227. #获取数据库连接
  228. sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db'])
  229. #用户任务作为响应体
  230. send_task = None
  231. while True:
  232. try:
  233. if task_queue.qsize()>0:
  234. p,t,task = task_queue.get(timeout=1)
  235. logging.info("当前任务队列长度{}".format(task_queue.qsize()+1))
  236. task_id=task["scenes_id"]
  237. task_version=task["version"]
  238. logging.info("当前version信息为:{}".format(stop_dict))
  239. if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]:
  240. logging.info("已暂停任务,数据过滤掉")
  241. continue
  242. input = task['input']
  243. account = input['account']
  244. post = input['post']
  245. #判断数据类型
  246. data = task['data']
  247. page_type = None
  248. taskId = None
  249. for data_str in data:
  250. try:
  251. app_data = json.loads(data[data_str])
  252. taskId = app_data['taskId']
  253. if "pageType" in app_data:
  254. page_type = app_data['pageType']
  255. break
  256. except:
  257. logging.error("正常判断,异常请忽略")
  258. logging.info("数据类型:{}".format(page_type))
  259. if page_type == 'userInfoPage':
  260. # 用户任务作为响应体
  261. send_task = task
  262. #用户类型数据写入
  263. sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
  264. # 构造参数元组
  265. values = (
  266. parse_data(task, account['taskId']),
  267. parse_data(task, account['accountId']),
  268. parse_data(task, account['accountName']),
  269. parse_data(task, account['nickName']),
  270. parse_data(task, account['fansCount']),
  271. parse_data(task, account['likeCount']),
  272. parse_data(task, account['postCount']),
  273. parse_data(task, account['otherInfo']),
  274. parse_data(task, account['authentication'])
  275. )
  276. sqlhelper.insert(sql,values)
  277. elif page_type == 'storyDetailPage':
  278. #帖子类型数据写入
  279. # 定义 SQL 语句
  280. sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
  281. # 构造参数元组
  282. values = (
  283. parse_data(task, post['taskId']),
  284. parse_data(task, post['postId']),
  285. parse_data(task, post['accountId']),
  286. parse_data(task, post['accountName']),
  287. parse_data(task, post['likeCount']),
  288. parse_data(task, post['emotionCount']),
  289. parse_data(task, post['commentsCount']),
  290. parse_data(task, post['shareCount']),
  291. parse_data(task, post['content']),
  292. parse_data(task, post['pubTime']),
  293. parse_data(task, post['crawlTime'])
  294. )
  295. sqlhelper.insert(sql,values)
  296. #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法
  297. if 'isLast'in data and data['isLast']:
  298. #获取用户相关的数据
  299. sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId)
  300. user_file_result = sqlhelper.queryOne(sql)
  301. # 获取帖子相关的数据
  302. sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId)
  303. post_file_result = sqlhelper.queryOne(sql)
  304. if send_task == None:
  305. send_task = task
  306. predict(user_file_result,post_file_result,send_task)
  307. #结束置空
  308. send_task = None
  309. else:
  310. # 暂无任务,进入休眠
  311. time.sleep(10)
  312. except Exception as e:
  313. traceback.print_exc()
  314. def zk_monitoring():
  315. try:
  316. #线上环境
  317. zk = KazooClient(hosts=config['zookeeper']['zkhost'])
  318. #测试环境
  319. # zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181')
  320. zk.start()
  321. # 设置监听器
  322. @zk.DataWatch("/analyze")
  323. def watch_node(data, stat, event):
  324. if event is not None and event.type == EventType.CHANGED:
  325. data, stat = zk.get("/analyze")
  326. logging.info("执行删除操作:{}".format(data))
  327. d = json.loads(data)
  328. id = d["scenes_id"]
  329. stop_dict[id] = {}
  330. stop_dict[id]["version"] = d["version"]
  331. stop_dict[id]["operation"] = d["operation"]
  332. # 保持程序运行以监听节点变化
  333. try:
  334. while True:
  335. time.sleep(1)
  336. except:
  337. logging.info("Stopping...")
  338. # 关闭连接
  339. zk.stop()
  340. zk.close()
  341. except:
  342. logging.error(traceback.format_exc())
  343. if __name__ == '__main__':
  344. all_result = {
  345. "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}",
  346. "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"}
  347. data = {}
  348. # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"}
  349. user_file_result = json.loads(all_result[data['user_file']])
  350. post_file_result = json.loads(all_result[data['post_file']])
  351. if user_file_result['resultList']:
  352. resultList = user_file_result['resultList']
  353. data['user_file'] = resultList[0]
  354. else:
  355. data['user_file'] = {}
  356. if post_file_result['resultList']:
  357. data['post_file'] = post_file_result['resultList'][0]
  358. else:
  359. data['post_file'] = {}
  360. print(data)