You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
443 lines
20 KiB
443 lines
20 KiB
#coding:utf8
|
|
import os, sys
|
|
import io
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
|
|
cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd()
|
|
par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir))
|
|
sys.path.append(cur_dir)
|
|
sys.path.append(par_dir)
|
|
import json
|
|
from django.http import HttpResponse
|
|
from text_analysis.tools import to_kafka,tool
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from log_util.set_logger import set_logger
|
|
logging=set_logger('logs/results.log')
|
|
import traceback
|
|
import queue
|
|
import requests
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from text_analysis.cusException import userFile_Exception,postFile_Exception,replyFile_Exception
|
|
from text_analysis.tools.tool import parse_data
|
|
import os
|
|
import joblib
|
|
from text_analysis.tools.db_pool import get_conn_pool
|
|
from kazoo.client import KazooClient
|
|
from kazoo.protocol.states import EventType
|
|
# 任务队列
|
|
import queue
|
|
task_queue = queue.PriorityQueue()
|
|
stop_dict={}
|
|
from text_analysis.read_config import load_config
|
|
config=load_config()
|
|
#专题图信息
|
|
replyGraph={}
|
|
|
|
@csrf_exempt
|
|
def robotIdentificationTopic(request):
|
|
if request.method == 'POST':
|
|
try:
|
|
raw_data = json.loads(request.body)
|
|
if "trace" in raw_data.keys() and raw_data["trace"]==True:
|
|
task_queue.put((-1, time.time(),raw_data))
|
|
else:
|
|
task_queue.put((1, time.time(),raw_data))
|
|
return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False))
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False))
|
|
else:
|
|
return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False))
|
|
|
|
def predictTopic(user_file_result,post_file_result,task,dbConfig,taskId):
|
|
try:
|
|
# 识别结果返回值
|
|
recognition_code = "0"
|
|
# 用户数据
|
|
res = {"successCode": "1", "errorLog": "", "results": {}}
|
|
# 获取历史数据源
|
|
all_result = task['data']
|
|
user_data = []
|
|
data={}
|
|
#返回值需要的三个字段
|
|
accountId=""
|
|
nickName=""
|
|
accountName=""
|
|
# {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"}
|
|
if user_file_result:
|
|
data['user_file'] = user_file_result
|
|
logging.info('用户数据:{}'.format(data['user_file']))
|
|
accountId = data["user_file"]["accountId"]
|
|
nickName = data["user_file"]["nickName"]
|
|
accountName = data["user_file"]["accountName"]
|
|
else:
|
|
data['user_file'] ={}
|
|
raise userFile_Exception
|
|
if post_file_result:
|
|
data['post_file'] = post_file_result
|
|
logging.info('帖子数据:{}'.format(data['post_file']))
|
|
else:
|
|
data['post_file'] = {}
|
|
raise postFile_Exception
|
|
try:
|
|
user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1
|
|
except:
|
|
user_data_otherInfo_1 = 0
|
|
try:
|
|
user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1
|
|
except:
|
|
user_data_nickName_2 = 0
|
|
try:
|
|
user_data_likeCount_4 = int(data["user_file"]["likeCount"])
|
|
except:
|
|
user_data_likeCount_4 = 0
|
|
try:
|
|
user_data_postCount_5 = int(data["user_file"]["postCount"])
|
|
except:
|
|
user_data_postCount_5 = 0
|
|
try:
|
|
user_data_authentication_6 = int(data["user_file"]["authentication"])
|
|
except:
|
|
user_data_authentication_6 = 0
|
|
user_data.extend(
|
|
[user_data_otherInfo_1, user_data_nickName_2, user_data_likeCount_4,user_data_postCount_5, user_data_authentication_6])
|
|
logging.info("用户数据处理完毕!-{}".format(user_data))
|
|
# 帖子数据
|
|
post_data = []
|
|
if data["post_file"]=={}:
|
|
post_data=[0,0,0,0,0,0,0,0]
|
|
else:
|
|
try:
|
|
post_data_LikeCount_1 = int(data["post_file"]["LikeCount"])
|
|
except:
|
|
post_data_LikeCount_1 = 0
|
|
try:
|
|
post_data_ShareCount_2 = int(data["post_file"]["ShareCount"])
|
|
except:
|
|
post_data_ShareCount_2 = 0
|
|
try:
|
|
post_data_emotionCount_3 = int(data["post_file"]["emotionCount"])
|
|
except:
|
|
post_data_emotionCount_3 = 0
|
|
try:
|
|
post_data_CommentsCount_4 = int(data["post_file"]["CommentsCount"])
|
|
except:
|
|
post_data_CommentsCount_4 = 0
|
|
try:
|
|
post_data_length_5 = int(data["post_file"]["length"])
|
|
except:
|
|
post_data_length_5 = 0
|
|
try:
|
|
post_data_tags_6 = int(data["post_file"]["tags"])
|
|
except:
|
|
post_data_tags_6 = 0
|
|
try:
|
|
post_data_https_7 = int(data["post_file"]["https"])
|
|
except:
|
|
post_data_https_7 = 0
|
|
try:
|
|
post_data_diffdate_8 = int(data["post_file"]["diffdate"])
|
|
except:
|
|
post_data_diffdate_8 = 0
|
|
post_data.extend(
|
|
[post_data_LikeCount_1, post_data_ShareCount_2, post_data_emotionCount_3, post_data_CommentsCount_4,
|
|
post_data_length_5, post_data_tags_6, post_data_https_7, post_data_diffdate_8])
|
|
logging.info("帖子数据处理完毕!-{}".format(post_data))
|
|
#关系数据
|
|
reply_data_1 = [0, 0, 0, 0, 0]
|
|
reply_data_2 = [0, 0]
|
|
try:
|
|
#先判断内存中是否有该专题图信息
|
|
topicID=taskId
|
|
if topicID not in list(replyGraph.keys()):
|
|
reply_file=tool.mysqlData(dbConfig,topicID,logging)
|
|
if reply_file:
|
|
graph=tool.get_replyData(reply_file)
|
|
replyGraph[topicID]=graph
|
|
else:
|
|
raise replyFile_Exception
|
|
else:
|
|
graph=replyGraph[topicID]
|
|
userId=data["user_file"]["accountId"]
|
|
if userId in list(graph.keys()):
|
|
closeness_centrality=graph["userId"]["closeness_centrality"]
|
|
pagerank=graph["userId"]["pagerank"]
|
|
clustering=graph["userId"]["clustering"]
|
|
in_degree=graph["userId"]["in_degree"]
|
|
out_degree=graph["userId"]["out_degree"]
|
|
reply_data_1=[closeness_centrality,pagerank,clustering,in_degree,out_degree]
|
|
user_flag_infl=graph["userId"]["user_flag_infl"]
|
|
user_flag_act=graph["userId"]["user_flag_act"]
|
|
reply_data_2=[user_flag_infl,user_flag_act]
|
|
replyGraph[topicID]["last_operation_time"]=datetime.now()
|
|
except:
|
|
logging.info("专题关系数据mysql获取失败!")
|
|
logging.info(traceback.format_exc())
|
|
logging.info("关系数据处理完毕!{}-{}".format(reply_data_1,reply_data_2))
|
|
features = [user_data + reply_data_1 + post_data + reply_data_2]
|
|
bot_user = joblib.load(cur_dir+"/model/bot_topic.pkl") # 加载训练好的模型
|
|
result = bot_user.predict(features)
|
|
recognition_code = str(result[0])
|
|
res["results"]=str(result[0])
|
|
results = {}
|
|
# 用户id
|
|
results['authorId'] = accountId
|
|
# 用户昵称
|
|
results['nickName'] = nickName
|
|
# 用户账号
|
|
results['accountName'] = accountName
|
|
# 结束标识
|
|
res['isLast'] = True
|
|
# 数据类型 --目前只提供给图谱使用
|
|
results['pageType'] = 'userAuthenPage'
|
|
if recognition_code == '0':
|
|
results['recognitionResult'] = '非机器人'
|
|
results['recognitionCode'] = recognition_code
|
|
elif recognition_code == '1':
|
|
results['recognitionResult'] = '机器人'
|
|
results['recognitionCode'] = recognition_code
|
|
else:
|
|
results['recognitionResult'] = '未知识别结果'
|
|
results['recognitionCode'] = recognition_code
|
|
results["isLast"]=1
|
|
res['results'] = json.dumps(results)
|
|
res["status"]=1
|
|
res["message"]="成功"
|
|
task["result"] = res
|
|
logging.info("增加预测数据-{}".format(task))
|
|
to_kafka.send_kafka(task, logging)
|
|
except userFile_Exception:
|
|
res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}}
|
|
results={}
|
|
results['authorId'] = ""
|
|
results['nickName'] = ""
|
|
results['accountName'] = ""
|
|
results['recognitionResult'] = '用户数据为空'
|
|
results["isLast"]=1
|
|
res['results'] = json.dumps(results)
|
|
res["status"]=2
|
|
res["message"]="用户数据为空"
|
|
task["result"] = res
|
|
logging.info("该条请求用户数据为空-{}".format(task))
|
|
to_kafka.send_kafka(task, logging)
|
|
except postFile_Exception:
|
|
res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}}
|
|
results={}
|
|
results['authorId'] = accountId
|
|
results['nickName'] = nickName
|
|
results['accountName'] = accountName
|
|
results['recognitionResult'] = '帖子数据为空'
|
|
results["isLast"]=1
|
|
res['results'] = json.dumps(results)
|
|
res["status"]=2
|
|
res["message"]="帖子数据为空"
|
|
task["result"] = res
|
|
logging.info("该条请求帖子数据为空-{}".format(task))
|
|
to_kafka.send_kafka(task, logging)
|
|
except replyFile_Exception:
|
|
res = {"successCode": "0", "errorLog": "发帖和评论关系数据为空!", "results": {}}
|
|
results={}
|
|
results['authorId'] = accountId
|
|
results['nickName'] = nickName
|
|
results['accountName'] = accountName
|
|
results['recognitionResult'] = '发帖和评论关系数据为空'
|
|
results["isLast"]=1
|
|
res['results'] = json.dumps(results)
|
|
res["status"]=2
|
|
res["message"]="发帖和评论关系数据为空"
|
|
task["result"] = res
|
|
logging.info("该条请求发帖和评论关系数据为空-{}".format(task))
|
|
to_kafka.send_kafka(task, logging)
|
|
|
|
except:
|
|
res = {"successCode": "0", "errorLog": "", "results": {}}
|
|
results = {}
|
|
results['authorId'] = accountId
|
|
results['nickName'] = nickName
|
|
results['accountName'] = accountName
|
|
results['recognitionResult'] = ""
|
|
results["isLast"]=1
|
|
res['results'] = json.dumps(results)
|
|
res["status"]=2
|
|
res["message"]="异常"
|
|
task["result"] = res
|
|
task["result"]["errorLog"] = traceback.format_exc()
|
|
logging.info(traceback.format_exc())
|
|
to_kafka.send_kafka(task, logging)
|
|
def data_structure():
|
|
'''
|
|
所需计算数据入库
|
|
:param dbConfig: 数据库连接信息
|
|
:return:
|
|
'''
|
|
dbConfig = dict(config.items('database'))
|
|
|
|
# 获取数据库连接
|
|
sqlhelper = get_conn_pool(dbConfig['host'], dbConfig['port'], dbConfig['username'], dbConfig['password'],dbConfig['db'])
|
|
#用户任务结构体缓存
|
|
user_tasks = {}
|
|
while True:
|
|
try:
|
|
if task_queue.qsize()>0:
|
|
p,t,task = task_queue.get(timeout=1)
|
|
task_id=task["scenes_id"]
|
|
task_version=task["version"]
|
|
logging.info("当前version信息为:{}".format(stop_dict))
|
|
if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]:
|
|
logging.info("已暂停任务,数据过滤掉")
|
|
continue
|
|
|
|
input = task['input']
|
|
account = input['account']
|
|
post = input['post']
|
|
reply = input['reply']
|
|
#判断数据类型
|
|
data = task['data']
|
|
page_type = None
|
|
taskId = None
|
|
for data_str in data:
|
|
try:
|
|
app_data = json.loads(data[data_str])
|
|
taskId = app_data['taskId']
|
|
if "pageType" in app_data:
|
|
page_type = app_data['pageType']
|
|
break
|
|
except:
|
|
logging.error("正常判断,异常请忽略")
|
|
if page_type == 'userInfoPage':
|
|
#用户添加到缓存
|
|
accountId = parse_data(task, account['accountId'])
|
|
user_tasks[accountId] = task
|
|
logging.info('成功添加用户缓存:{}'.format(accountId))
|
|
#用户类型数据写入
|
|
sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
|
# 构造参数元组
|
|
values = (
|
|
parse_data(task, account['taskId']),
|
|
parse_data(task, account['accountId']),
|
|
parse_data(task, account['accountName']),
|
|
parse_data(task, account['nickName']),
|
|
parse_data(task, account['fansCount']),
|
|
parse_data(task, account['likeCount']),
|
|
parse_data(task, account['postCount']),
|
|
parse_data(task, account['otherInfo']),
|
|
parse_data(task, account['authentication'])
|
|
)
|
|
sqlhelper.insert(sql,values)
|
|
elif page_type == 'storyDetailPage':
|
|
#帖子类型数据写入
|
|
sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
|
# 构造参数元组
|
|
values = (
|
|
parse_data(task, post['taskId']),
|
|
parse_data(task, post['postId']),
|
|
parse_data(task, post['accountId']),
|
|
parse_data(task, post['accountName']),
|
|
parse_data(task, post['likeCount']),
|
|
parse_data(task, post['emotionCount']),
|
|
parse_data(task, post['commentsCount']),
|
|
parse_data(task, post['shareCount']),
|
|
parse_data(task, post['content']),
|
|
parse_data(task, post['pubTime']),
|
|
parse_data(task, post['crawlTime'])
|
|
)
|
|
sqlhelper.insert(sql,values)
|
|
elif page_type == 'socialComment':
|
|
#评论类型数据写入
|
|
sql = "INSERT INTO `reply`(`taskId`, `ReviewerAccountId`, `ReviewerAccountName`, `postId`, `ShareCount`, `LikeCount`, `CommentCount`, `CommentTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
|
|
# 构造参数元组
|
|
values = (
|
|
parse_data(task, reply['taskId']),
|
|
parse_data(task, reply['reviewerAccountId']),
|
|
parse_data(task, reply['reviewerAccountName']),
|
|
parse_data(task, reply['postId']),
|
|
parse_data(task, reply['shareCount']),
|
|
parse_data(task, reply['likeCount']),
|
|
parse_data(task, reply['commentsCount']),
|
|
parse_data(task, reply['commentTime'])
|
|
)
|
|
sqlhelper.insert(sql,values)
|
|
#判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法
|
|
if 'isLast'in data and data['isLast']:
|
|
#获取用户相关的数据
|
|
sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId)
|
|
user_file_result = sqlhelper.queryAll(sql)
|
|
if user_file_result:
|
|
for user in user_file_result:
|
|
try:
|
|
# 获取帖子相关的数据
|
|
sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post WHERE taskId = '{taskId}' and accountId = '{accountId}'".format(taskId=taskId,accountId=user['accountId'])
|
|
post_file_result = sqlhelper.queryOne(sql)
|
|
send_task = user_tasks[user['accountId']]
|
|
predictTopic(user,post_file_result,send_task,dbConfig,taskId)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logging.error("用户id:{}".format(user['accountId']))
|
|
logging.error("用户缓存加载失败:{}".format(send_task))
|
|
else:
|
|
#清空用户任务缓存
|
|
user_tasks.clear()
|
|
else:
|
|
# 暂无任务,进入休眠
|
|
time.sleep(10)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
|
|
def replyGraphThread():
|
|
'''
|
|
判断话题是否结束,如果2个小时未访问话题,则删除该话题的图信息。
|
|
:return:
|
|
'''
|
|
while True:
|
|
try:
|
|
if replyGraph!={}:
|
|
# 获取当前时间
|
|
current_time = datetime.now()
|
|
for topicID in list(replyGraph.keys()):
|
|
# 计算最后一次操作的时间与当前时间的差值
|
|
time_difference = current_time - replyGraph[topicID]['last_operation_time']
|
|
# 如果差值大于等于120分钟,则删除该话题图信息
|
|
if time_difference >= timedelta(minutes=120):
|
|
del replyGraph[topicID]
|
|
except:
|
|
logging.info(traceback.format_exc())
|
|
finally:
|
|
time.sleep(1800)
|
|
|
|
|
|
|
|
def zk_monitoring():
|
|
try:
|
|
#线上环境
|
|
zk = KazooClient(hosts=config['zookeeper']['zkhost'])
|
|
#测试环境
|
|
# zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181')
|
|
zk.start()
|
|
# 设置监听器
|
|
@zk.DataWatch("/analyze")
|
|
def watch_node(data, stat, event):
|
|
if event is not None and event.type == EventType.CHANGED:
|
|
data, stat = zk.get("/analyze")
|
|
logging.info("执行删除操作:{}".format(data))
|
|
try:
|
|
d = json.loads(data)
|
|
id = d["scenes_id"]
|
|
stop_dict[id] = {}
|
|
stop_dict[id]["version"] = d["version"]
|
|
stop_dict[id]["operation"] = d["operation"]
|
|
except:
|
|
pass
|
|
# 保持程序运行以监听节点变化
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except:
|
|
logging.info("Stopping...")
|
|
# 关闭连接
|
|
zk.stop()
|
|
zk.close()
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
|
|
|
|
|