|
|
# coding:utf8 import os, sys import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) sys.path.append(cur_dir) sys.path.append(par_dir) import json from django.http import HttpResponse from text_analysis.tools import to_kafka from django.views.decorators.csrf import csrf_exempt from log_util.set_logger import set_logger from text_analysis.tools.tool import parse_data logging = set_logger('logs/results.log') import traceback import time import joblib from text_analysis.cusException import userFile_Exception, postFile_Exception
from kazoo.client import KazooClient from kazoo.protocol.states import EventType # 任务队列 import queue task_queue = queue.PriorityQueue() stop_dict={} from text_analysis.read_config import load_config config=load_config()
#mysql连接池 from text_analysis.tools.db_pool import get_conn_pool
@csrf_exempt def robotIdentification(request): if request.method == 'POST': try: raw_data = json.loads(request.body) if "trace" in raw_data.keys() and raw_data["trace"]==True: task_queue.put((-1, time.time(),raw_data)) else: task_queue.put((1,time.time(), raw_data)) return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) except: logging.error(traceback.format_exc()) return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) else: return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False))
def predict(user_file_result,post_file_result,task): try: # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", # "fansCount": 308, "likeCount": 92707, "postCount": 14237, # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", # "authentication": 0}, # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} # 用户数据 res = {"successCode": "1", "errorLog": "", "results": {}} user_data = [] # 返回值需要的三个字段 accountId = "" nickName = "" accountName = "" data = {} if user_file_result: data['user_file'] = user_file_result logging.info('用户数据:{}'.format(data['user_file'])) accountId = data["user_file"]["accountId"] nickName = data["user_file"]["nickName"] accountName = data["user_file"]["accountName"] else: data['user_file'] = {} raise userFile_Exception if post_file_result: data['post_file'] = post_file_result logging.info('帖子数据:{}'.format(data['post_file'])) else: data['post_file'] = {} raise postFile_Exception # 识别结果返回值 recognition_code = "0" try: user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 except: user_data_otherInfo_1 = 0 try: user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 except: user_data_nickName_2 = 0 try: user_data_fansCount_3 = int(data["user_file"]["fansCount"]) except: user_data_fansCount_3 = 0 try: user_data_likeCount_4 = int(data["user_file"]["likeCount"]) except: user_data_likeCount_4 = 0 try: user_data_postCount_5 = int(data["user_file"]["postCount"]) except: user_data_postCount_5 = 0 try: user_data_authentication_6 = int(data["user_file"]["authentication"]) except: user_data_authentication_6 = 0 user_data.extend( [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, user_data_postCount_5, user_data_authentication_6]) # 帖子数据 if data["post_file"] == {}: recognition_code = "-1" else: post_data = [] try: post_data_count_1 = int(data["post_file"]["count"]) except: post_data_count_1 = 0 try: post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) except: post_data_LikeCount_2 = 0 try: post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) except: post_data_CommentsCount_3 = 0 try: post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) except: post_data_ShareCount_4 = 0 try: post_data_length_5 = int(data["post_file"]["length"]) except: post_data_length_5 = 0 try: post_data_tags_6 = int(data["post_file"]["tags"]) except: post_data_tags_6 = 0 try: post_data_https_7 = int(data["post_file"]["https"]) except: post_data_https_7 = 0 try: post_data_at_8 = int(data["post_file"]["at"]) except: post_data_at_8 = 0 try: post_data_diffdate_9 = int(data["post_file"]["diffdate"]) except: post_data_diffdate_9 = 0 post_data.extend( [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) features = [user_data + post_data] bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 result = bot_user.predict(features) recognition_code = str(result[0]) # logging.info("预测模型结果为{}".format(result)) results = {} # 用户id results['authorId'] = accountId # 用户昵称 results['nickName'] = nickName # 用户账号 results['accountName'] = accountName #结束标识 res['isLast'] = True #数据类型 --目前只提供给图谱使用 results['pageType'] = 'userAuthenPage' if recognition_code == '0': results['recognitionResult'] = '非机器人' results['recognitionCode'] = recognition_code elif recognition_code == '1': results['recognitionResult'] = '机器人' results['recognitionCode'] = recognition_code else: results['recognitionResult'] = '未知识别结果' results['recognitionCode'] = recognition_code results["isLast"]=1 res['results'] = json.dumps(results) res["status"]=1 res["message"]="成功" task["result"] = res logging.info("增加预测数据-{}".format(task)) to_kafka.send_kafka(task, logging) except userFile_Exception: res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} results = {} results['authorId'] = "" results['nickName'] = "" results['accountName'] = "" results['recognitionResult'] = '用户数据为空' results["isLast"]=1 res['results'] = json.dumps(results) res["status"]=2 res["message"]="用户数据为空" task["result"] = res logging.info("该条请求用户数据为空-{}".format(task)) to_kafka.send_kafka(task, logging) except postFile_Exception: res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} results = {} results['authorId'] = accountId results['nickName'] = nickName results['accountName'] = accountName results['recognitionResult'] = '帖子数据为空' results["isLast"]=1 res['results'] = json.dumps(results) res["status"]=2 res["message"]="帖子数据为空" task["result"] = res logging.info("该条请求帖子数据为空-{}".format(task)) to_kafka.send_kafka(task, logging) except: res = {"successCode": "0", "errorLog": "", "results": {}} results = {} results['authorId'] = accountId results['nickName'] = nickName results['accountName'] = accountName results['recognitionResult'] = "" results["isLast"]=1 res["results"] = json.dumps(results) res["status"]=2 res["message"]="异常" task["result"] = res task["result"]["error"] = traceback.format_exc() logging.info(traceback.format_exc()) to_kafka.send_kafka(task, logging)
def data_structure(dbConfig): '''
水军识别数据构造 主要是写入用户表、主贴表 '''
#获取数据库连接 sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) #用户任务作为响应体 send_task = None while True: try: if task_queue.qsize()>0: p,t,task = task_queue.get(timeout=1) logging.info("当前任务队列长度{}".format(task_queue.qsize()+1)) task_id=task["scenes_id"] task_version=task["version"] logging.info("当前version信息为:{}".format(stop_dict)) if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]: logging.info("已暂停任务,数据过滤掉") continue
input = task['input'] account = input['account'] post = input['post'] #判断数据类型 data = task['data'] page_type = None taskId = None for data_str in data: try: app_data = json.loads(data[data_str]) taskId = app_data['taskId'] if "pageType" in app_data: page_type = app_data['pageType'] break except: logging.error("正常判断,异常请忽略") logging.info("数据类型:{}".format(page_type)) if page_type == 'userInfoPage': # 用户任务作为响应体 send_task = task #用户类型数据写入 sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" # 构造参数元组 values = ( parse_data(task, account['taskId']), parse_data(task, account['accountId']), parse_data(task, account['accountName']), parse_data(task, account['nickName']), parse_data(task, account['fansCount']), parse_data(task, account['likeCount']), parse_data(task, account['postCount']), parse_data(task, account['otherInfo']), parse_data(task, account['authentication']) ) sqlhelper.insert(sql,values)
elif page_type == 'storyDetailPage': #帖子类型数据写入 # 定义 SQL 语句 sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # 构造参数元组 values = ( parse_data(task, post['taskId']), parse_data(task, post['postId']), parse_data(task, post['accountId']), parse_data(task, post['accountName']), parse_data(task, post['likeCount']), parse_data(task, post['emotionCount']), parse_data(task, post['commentsCount']), parse_data(task, post['shareCount']), parse_data(task, post['content']), parse_data(task, post['pubTime']), parse_data(task, post['crawlTime']) ) sqlhelper.insert(sql,values) #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 if 'isLast'in data and data['isLast']: #获取用户相关的数据 sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) user_file_result = sqlhelper.queryOne(sql) # 获取帖子相关的数据 sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId) post_file_result = sqlhelper.queryOne(sql) if send_task == None: send_task = task predict(user_file_result,post_file_result,send_task) #结束置空 send_task = None else: # 暂无任务,进入休眠 time.sleep(10) except Exception as e: traceback.print_exc()
def zk_monitoring(): try: #线上环境 zk = KazooClient(hosts=config['zookeeper']['zkhost']) #测试环境 # zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181') zk.start() # 设置监听器 @zk.DataWatch("/analyze") def watch_node(data, stat, event): if event is not None and event.type == EventType.CHANGED: data, stat = zk.get("/analyze") logging.info("执行删除操作:{}".format(data)) d = json.loads(data) id = d["scenes_id"] stop_dict[id] = {} stop_dict[id]["version"] = d["version"] stop_dict[id]["operation"] = d["operation"] # 保持程序运行以监听节点变化 try: while True: time.sleep(1) except: logging.info("Stopping...") # 关闭连接 zk.stop() zk.close() except: logging.error(traceback.format_exc())
if __name__ == '__main__': all_result = { "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}", "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} data = {} # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} user_file_result = json.loads(all_result[data['user_file']]) post_file_result = json.loads(all_result[data['post_file']]) if user_file_result['resultList']: resultList = user_file_result['resultList'] data['user_file'] = resultList[0] else: data['user_file'] = {} if post_file_result['resultList']: data['post_file'] = post_file_result['resultList'][0] else: data['post_file'] = {}
print(data)
|