用户水军识别应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

383 lines
17 KiB

# coding:utf8
import os, sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd()
par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir))
sys.path.append(cur_dir)
sys.path.append(par_dir)
import json
from django.http import HttpResponse
from text_analysis.tools import to_kafka
from django.views.decorators.csrf import csrf_exempt
from log_util.set_logger import set_logger
from text_analysis.tools.tool import parse_data
logging = set_logger('logs/results.log')
import traceback
import time
import joblib
from text_analysis.cusException import userFile_Exception, postFile_Exception
from kazoo.client import KazooClient
from kazoo.protocol.states import EventType
# 任务队列
import queue
task_queue = queue.PriorityQueue()
stop_dict={}
from text_analysis.read_config import load_config
config=load_config()
#mysql连接池
from text_analysis.tools.db_pool import get_conn_pool
@csrf_exempt
def robotIdentification(request):
if request.method == 'POST':
try:
raw_data = json.loads(request.body)
if "trace" in raw_data.keys() and raw_data["trace"]==True:
task_queue.put((-1, time.time(),raw_data))
else:
task_queue.put((1,time.time(), raw_data))
return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False))
except:
logging.error(traceback.format_exc())
return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False))
else:
return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False))
def predict(user_file_result,post_file_result,task):
try:
# raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung",
# "fansCount": 308, "likeCount": 92707, "postCount": 14237,
# "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}",
# "authentication": 0},
# "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1,
# "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}}
# 用户数据
res = {"successCode": "1", "errorLog": "", "results": {}}
user_data = []
# 返回值需要的三个字段
accountId = ""
nickName = ""
accountName = ""
data = {}
if user_file_result:
data['user_file'] = user_file_result
logging.info('用户数据:{}'.format(data['user_file']))
accountId = data["user_file"]["accountId"]
nickName = data["user_file"]["nickName"]
accountName = data["user_file"]["accountName"]
else:
data['user_file'] = {}
raise userFile_Exception
if post_file_result:
data['post_file'] = post_file_result
logging.info('帖子数据:{}'.format(data['post_file']))
else:
data['post_file'] = {}
raise postFile_Exception
# 识别结果返回值
recognition_code = "0"
try:
user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1
except:
user_data_otherInfo_1 = 0
try:
user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1
except:
user_data_nickName_2 = 0
try:
user_data_fansCount_3 = int(data["user_file"]["fansCount"])
except:
user_data_fansCount_3 = 0
try:
user_data_likeCount_4 = int(data["user_file"]["likeCount"])
except:
user_data_likeCount_4 = 0
try:
user_data_postCount_5 = int(data["user_file"]["postCount"])
except:
user_data_postCount_5 = 0
try:
user_data_authentication_6 = int(data["user_file"]["authentication"])
except:
user_data_authentication_6 = 0
user_data.extend(
[user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4,
user_data_postCount_5, user_data_authentication_6])
# 帖子数据
if data["post_file"] == {}:
recognition_code = "-1"
else:
post_data = []
try:
post_data_count_1 = int(data["post_file"]["count"])
except:
post_data_count_1 = 0
try:
post_data_LikeCount_2 = int(data["post_file"]["LikeCount"])
except:
post_data_LikeCount_2 = 0
try:
post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"])
except:
post_data_CommentsCount_3 = 0
try:
post_data_ShareCount_4 = int(data["post_file"]["ShareCount"])
except:
post_data_ShareCount_4 = 0
try:
post_data_length_5 = int(data["post_file"]["length"])
except:
post_data_length_5 = 0
try:
post_data_tags_6 = int(data["post_file"]["tags"])
except:
post_data_tags_6 = 0
try:
post_data_https_7 = int(data["post_file"]["https"])
except:
post_data_https_7 = 0
try:
post_data_at_8 = int(data["post_file"]["at"])
except:
post_data_at_8 = 0
try:
post_data_diffdate_9 = int(data["post_file"]["diffdate"])
except:
post_data_diffdate_9 = 0
post_data.extend(
[post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4,
post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9])
features = [user_data + post_data]
bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型
result = bot_user.predict(features)
recognition_code = str(result[0])
# logging.info("预测模型结果为{}".format(result))
results = {}
# 用户id
results['authorId'] = accountId
# 用户昵称
results['nickName'] = nickName
# 用户账号
results['accountName'] = accountName
#结束标识
res['isLast'] = True
#数据类型 --目前只提供给图谱使用
results['pageType'] = 'userAuthenPage'
if recognition_code == '0':
results['recognitionResult'] = '非机器人'
results['recognitionCode'] = recognition_code
elif recognition_code == '1':
results['recognitionResult'] = '机器人'
results['recognitionCode'] = recognition_code
else:
results['recognitionResult'] = '未知识别结果'
results['recognitionCode'] = recognition_code
results["isLast"]=1
res['results'] = json.dumps(results)
res["status"]=1
res["message"]="成功"
task["result"] = res
logging.info("增加预测数据-{}".format(task))
to_kafka.send_kafka(task, logging)
except userFile_Exception:
res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}}
results = {}
results['authorId'] = ""
results['nickName'] = ""
results['accountName'] = ""
results['recognitionResult'] = '用户数据为空'
results["isLast"]=1
res['results'] = json.dumps(results)
res["status"]=2
res["message"]="用户数据为空"
task["result"] = res
logging.info("该条请求用户数据为空-{}".format(task))
to_kafka.send_kafka(task, logging)
except postFile_Exception:
res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}}
results = {}
results['authorId'] = accountId
results['nickName'] = nickName
results['accountName'] = accountName
results['recognitionResult'] = '帖子数据为空'
results["isLast"]=1
res['results'] = json.dumps(results)
res["status"]=2
res["message"]="帖子数据为空"
task["result"] = res
logging.info("该条请求帖子数据为空-{}".format(task))
to_kafka.send_kafka(task, logging)
except:
res = {"successCode": "0", "errorLog": "", "results": {}}
results = {}
results['authorId'] = accountId
results['nickName'] = nickName
results['accountName'] = accountName
results['recognitionResult'] = ""
results["isLast"]=1
res["results"] = json.dumps(results)
res["status"]=2
res["message"]="异常"
task["result"] = res
task["result"]["error"] = traceback.format_exc()
logging.info(traceback.format_exc())
to_kafka.send_kafka(task, logging)
def data_structure(dbConfig):
'''
水军识别数据构造
主要是写入用户表、主贴表
'''
#获取数据库连接
sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db'])
#用户任务作为响应体
send_task = None
while True:
try:
if task_queue.qsize()>0:
p,t,task = task_queue.get(timeout=1)
logging.info("当前任务队列长度{}".format(task_queue.qsize()+1))
task_id=task["scenes_id"]
task_version=task["version"]
logging.info("当前version信息为:{}".format(stop_dict))
if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]:
logging.info("已暂停任务,数据过滤掉")
continue
input = task['input']
account = input['account']
post = input['post']
#判断数据类型
data = task['data']
page_type = None
taskId = None
for data_str in data:
try:
app_data = json.loads(data[data_str])
taskId = app_data['taskId']
if "pageType" in app_data:
page_type = app_data['pageType']
break
except:
logging.error("正常判断,异常请忽略")
logging.info("数据类型:{}".format(page_type))
if page_type == 'userInfoPage':
# 用户任务作为响应体
send_task = task
#用户类型数据写入
sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
# 构造参数元组
values = (
parse_data(task, account['taskId']),
parse_data(task, account['accountId']),
parse_data(task, account['accountName']),
parse_data(task, account['nickName']),
parse_data(task, account['fansCount']),
parse_data(task, account['likeCount']),
parse_data(task, account['postCount']),
parse_data(task, account['otherInfo']),
parse_data(task, account['authentication'])
)
sqlhelper.insert(sql,values)
elif page_type == 'storyDetailPage':
#帖子类型数据写入
# 定义 SQL 语句
sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
# 构造参数元组
values = (
parse_data(task, post['taskId']),
parse_data(task, post['postId']),
parse_data(task, post['accountId']),
parse_data(task, post['accountName']),
parse_data(task, post['likeCount']),
parse_data(task, post['emotionCount']),
parse_data(task, post['commentsCount']),
parse_data(task, post['shareCount']),
parse_data(task, post['content']),
parse_data(task, post['pubTime']),
parse_data(task, post['crawlTime'])
)
sqlhelper.insert(sql,values)
#判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法
if 'isLast'in data and data['isLast']:
#获取用户相关的数据
sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId)
user_file_result = sqlhelper.queryOne(sql)
# 获取帖子相关的数据
sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId)
post_file_result = sqlhelper.queryOne(sql)
if send_task == None:
send_task = task
predict(user_file_result,post_file_result,send_task)
#结束置空
send_task = None
else:
# 暂无任务,进入休眠
time.sleep(10)
except Exception as e:
traceback.print_exc()
def zk_monitoring():
try:
#线上环境
zk = KazooClient(hosts=config['zookeeper']['zkhost'])
#测试环境
# zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181')
zk.start()
# 设置监听器
@zk.DataWatch("/analyze")
def watch_node(data, stat, event):
if event is not None and event.type == EventType.CHANGED:
data, stat = zk.get("/analyze")
logging.info("执行删除操作:{}".format(data))
d = json.loads(data)
id = d["scenes_id"]
stop_dict[id] = {}
stop_dict[id]["version"] = d["version"]
stop_dict[id]["operation"] = d["operation"]
# 保持程序运行以监听节点变化
try:
while True:
time.sleep(1)
except:
logging.info("Stopping...")
# 关闭连接
zk.stop()
zk.close()
except:
logging.error(traceback.format_exc())
if __name__ == '__main__':
all_result = {
"9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}",
"8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"}
data = {}
# {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"}
user_file_result = json.loads(all_result[data['user_file']])
post_file_result = json.loads(all_result[data['post_file']])
if user_file_result['resultList']:
resultList = user_file_result['resultList']
data['user_file'] = resultList[0]
else:
data['user_file'] = {}
if post_file_result['resultList']:
data['post_file'] = post_file_result['resultList'][0]
else:
data['post_file'] = {}
print(data)