|
|
#coding:utf8 import os, sys import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) sys.path.append(cur_dir) sys.path.append(par_dir) import json from django.http import HttpResponse from text_analysis.tools import to_kafka,tool from django.views.decorators.csrf import csrf_exempt from log_util.set_logger import set_logger logging=set_logger('logs/results.log') import traceback import queue import requests import time from datetime import datetime, timedelta from text_analysis.cusException import userFile_Exception,postFile_Exception,replyFile_Exception from text_analysis.tools.tool import parse_data import os import joblib from text_analysis.tools.db_pool import get_conn_pool #任务队列 global task_queue task_queue = queue.Queue() global replyGraph replyGraph={}
@csrf_exempt def robotIdentificationTopic(request): if request.method == 'POST': try: raw_data = json.loads(request.body) task_queue.put(raw_data) return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) except: logging.error(traceback.format_exc()) return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) else: return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) def predictTopic(user_file_result,post_file_result,task,dbConfig,taskId): try: # 识别结果返回值 recognition_code = "0" # 用户数据 res = {"successCode": "1", "errorLog": "", "results": {}} # 获取历史数据源 all_result = task['data'] user_data = [] data={} #返回值需要的三个字段 accountId="" nickName="" accountName="" # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} if user_file_result: data['user_file'] = user_file_result logging.info('用户数据:{}'.format(data['user_file'])) accountId = data["user_file"]["accountId"] nickName = data["user_file"]["nickName"] accountName = data["user_file"]["accountName"] else: data['user_file'] ={} raise userFile_Exception if post_file_result: data['post_file'] = post_file_result logging.info('帖子数据:{}'.format(data['post_file'])) else: data['post_file'] = {} raise postFile_Exception try: user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 except: user_data_otherInfo_1 = 0 try: user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 except: user_data_nickName_2 = 0 try: user_data_likeCount_4 = int(data["user_file"]["likeCount"]) except: user_data_likeCount_4 = 0 try: user_data_postCount_5 = int(data["user_file"]["postCount"]) except: user_data_postCount_5 = 0 try: user_data_authentication_6 = int(data["user_file"]["authentication"]) except: user_data_authentication_6 = 0 user_data.extend( [user_data_otherInfo_1, user_data_nickName_2, user_data_likeCount_4,user_data_postCount_5, user_data_authentication_6]) logging.info("用户数据处理完毕!-{}".format(user_data)) # 帖子数据 post_data = [] if data["post_file"]=={}: post_data=[0,0,0,0,0,0,0,0] else: try: post_data_LikeCount_1 = int(data["post_file"]["LikeCount"]) except: post_data_LikeCount_1 = 0 try: post_data_ShareCount_2 = int(data["post_file"]["ShareCount"]) except: post_data_ShareCount_2 = 0 try: post_data_emotionCount_3 = int(data["post_file"]["emotionCount"]) except: post_data_emotionCount_3 = 0 try: post_data_CommentsCount_4 = int(data["post_file"]["CommentsCount"]) except: post_data_CommentsCount_4 = 0 try: post_data_length_5 = int(data["post_file"]["length"]) except: post_data_length_5 = 0 try: post_data_tags_6 = int(data["post_file"]["tags"]) except: post_data_tags_6 = 0 try: post_data_https_7 = int(data["post_file"]["https"]) except: post_data_https_7 = 0 try: post_data_diffdate_8 = int(data["post_file"]["diffdate"]) except: post_data_diffdate_8 = 0 post_data.extend( [post_data_LikeCount_1, post_data_ShareCount_2, post_data_emotionCount_3, post_data_CommentsCount_4, post_data_length_5, post_data_tags_6, post_data_https_7, post_data_diffdate_8]) logging.info("帖子数据处理完毕!-{}".format(post_data)) #关系数据 reply_data_1 = [0, 0, 0, 0, 0] reply_data_2 = [0, 0] try: #先判断内存中是否有该专题图信息 topicID=taskId if topicID not in list(replyGraph.keys()): reply_file=tool.mysqlData(dbConfig,topicID,logging) if reply_file: graph=tool.get_replyData(reply_file) replyGraph[topicID]=graph else: raise replyFile_Exception else: graph=replyGraph[topicID] userId=data["user_file"]["accountId"] if userId in list(graph.keys()): closeness_centrality=graph["userId"]["closeness_centrality"] pagerank=graph["userId"]["pagerank"] clustering=graph["userId"]["clustering"] in_degree=graph["userId"]["in_degree"] out_degree=graph["userId"]["out_degree"] reply_data_1=[closeness_centrality,pagerank,clustering,in_degree,out_degree] user_flag_infl=graph["userId"]["user_flag_infl"] user_flag_act=graph["userId"]["user_flag_act"] reply_data_2=[user_flag_infl,user_flag_act] replyGraph[topicID]["last_operation_time"]=datetime.now() except: logging.info("专题关系数据mysql获取失败!") logging.info(traceback.format_exc()) logging.info("关系数据处理完毕!{}-{}".format(reply_data_1,reply_data_2)) features = [user_data + reply_data_1 + post_data + reply_data_2] bot_user = joblib.load(cur_dir+"/model/bot_topic.pkl") # 加载训练好的模型 result = bot_user.predict(features) recognition_code = str(result[0]) res["results"]=str(result[0]) results = {} # 用户id results['authorId'] = accountId # 用户昵称 results['nickName'] = nickName # 用户账号 results['accountName'] = accountName # 结束标识 res['isLast'] = True # 数据类型 --目前只提供给图谱使用 results['pageType'] = 'userAuthenPage' if recognition_code == '0': results['recognitionResult'] = '非机器人' results['recognitionCode'] = recognition_code elif recognition_code == '1': results['recognitionResult'] = '机器人' results['recognitionCode'] = recognition_code else: results['recognitionResult'] = '未知识别结果' results['recognitionCode'] = recognition_code res['results'] = json.dumps(results) task["result"] = res logging.info("增加预测数据-{}".format(task)) to_kafka.send_kafka(task, logging) except userFile_Exception: res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} results={} results['authorId'] = "" results['nickName'] = "" results['accountName'] = "" results['recognitionResult'] = '用户数据为空' res['results'] = json.dumps(results) task["result"] = res logging.info("该条请求用户数据为空-{}".format(task)) to_kafka.send_kafka(task, logging) except postFile_Exception: res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} results={} results['authorId'] = accountId results['nickName'] = nickName results['accountName'] = accountName results['recognitionResult'] = '帖子数据为空' res['results'] = json.dumps(results) task["result"] = res logging.info("该条请求帖子数据为空-{}".format(task)) to_kafka.send_kafka(task, logging) except replyFile_Exception: res = {"successCode": "0", "errorLog": "发帖和评论关系数据为空!", "results": {}} results={} results['authorId'] = accountId results['nickName'] = nickName results['accountName'] = accountName results['recognitionResult'] = '发帖和评论关系数据为空' res['results'] = json.dumps(results) task["result"] = res logging.info("该条请求发帖和评论关系数据为空-{}".format(task)) to_kafka.send_kafka(task, logging)
except: res = {"successCode": "0", "errorLog": "", "results": {}} results = {} results['authorId'] = accountId results['nickName'] = nickName results['accountName'] = accountName results['recognitionResult'] = "" res['results'] = json.dumps(results) task["result"] = res task["result"]["error"] = traceback.format_exc() logging.info(traceback.format_exc()) to_kafka.send_kafka(task, logging) def data_structure(dbConfig): ''' 所需计算数据入库 :param dbConfig: 数据库连接信息 :return: ''' # 获取数据库连接 sqlhelper = get_conn_pool(dbConfig['host'], dbConfig['port'], dbConfig['username'], dbConfig['password'],dbConfig['db']) #用户任务结构体缓存 user_tasks = {} while True: if task_queue.qsize() > 0: try: task = task_queue.get() input = task['input'] account = input['account'] post = input['post'] reply = input['reply']
#判断数据类型 data = task['data'] page_type = None taskId = None for data_str in data: try: app_data = json.loads(data[data_str]) taskId = app_data['taskId'] if "pageType" in app_data: page_type = app_data['pageType'] break except: logging.error("正常判断,异常请忽略") if page_type == 'userInfoPage': #用户添加到缓存 accountId = parse_data(task, account['accountId']) user_tasks[accountId] = task logging.info('成功添加用户缓存:{}'.format(accountId)) #用户类型数据写入 sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" # 构造参数元组 values = ( parse_data(task, account['taskId']), parse_data(task, account['accountId']), parse_data(task, account['accountName']), parse_data(task, account['nickName']), parse_data(task, account['fansCount']), parse_data(task, account['likeCount']), parse_data(task, account['postCount']), parse_data(task, account['otherInfo']), parse_data(task, account['authentication']) ) sqlhelper.insert(sql,values) elif page_type == 'storyDetailPage': #帖子类型数据写入 sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # 构造参数元组 values = ( parse_data(task, post['taskId']), parse_data(task, post['postId']), parse_data(task, post['accountId']), parse_data(task, post['accountName']), parse_data(task, post['likeCount']), parse_data(task, post['emotionCount']), parse_data(task, post['commentsCount']), parse_data(task, post['shareCount']), parse_data(task, post['content']), parse_data(task, post['pubTime']), parse_data(task, post['crawlTime']) ) sqlhelper.insert(sql,values) elif page_type == 'socialComment': #评论类型数据写入 sql = "INSERT INTO `reply`(`taskId`, `ReviewerAccountId`, `ReviewerAccountName`, `postId`, `ShareCount`, `LikeCount`, `CommentCount`, `CommentTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" # 构造参数元组 values = ( parse_data(task, reply['taskId']), parse_data(task, reply['reviewerAccountId']), parse_data(task, reply['reviewerAccountName']), parse_data(task, reply['postId']), parse_data(task, reply['shareCount']), parse_data(task, reply['likeCount']), parse_data(task, reply['commentsCount']), parse_data(task, reply['commentTime']) ) sqlhelper.insert(sql,values) #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 if 'isLast'in data and data['isLast']: #获取用户相关的数据 sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) user_file_result = sqlhelper.queryAll(sql) if user_file_result: for user in user_file_result: try: # 获取帖子相关的数据 sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post WHERE taskId = '{taskId}' and accountId = '{accountId}'".format(taskId=taskId,accountId=user['accountId']) post_file_result = sqlhelper.queryOne(sql) send_task = user_tasks[user['accountId']] predictTopic(user,post_file_result,send_task,dbConfig,taskId) except Exception as e: traceback.print_exc() logging.error("用户id:{}".format(user['accountId'])) logging.error("用户缓存加载失败:{}".format(send_task)) else: #清空用户任务缓存 user_tasks.clear() except Exception as e: traceback.print_exc() else: # 暂无任务,进入休眠 time.sleep(10) def replyGraphThread(): ''' 判断话题是否结束,如果2个小时未访问话题,则删除该话题的图信息。 :return: ''' while True: try: if replyGraph!={}: # 获取当前时间 current_time = datetime.now() for topicID in list(replyGraph.keys()): # 计算最后一次操作的时间与当前时间的差值 time_difference = current_time - replyGraph[topicID]['last_operation_time'] # 如果差值大于等于120分钟,则删除该话题图信息 if time_difference >= timedelta(minutes=120): del replyGraph[topicID] except: logging.info(traceback.format_exc()) finally: time.sleep(1800)
|