commit 661fd3edb55a3a0f41c15f292aacaa5e276304dd Author: maojian <550076202@qq.com> Date: Wed Jan 8 15:02:57 2025 +0800 用户水军识别应用 diff --git a/.idea/asr.iml b/.idea/asr.iml new file mode 100644 index 0000000..73af2a0 --- /dev/null +++ b/.idea/asr.iml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..62002bf --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..edc8d49 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..c1022cb --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,824 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + KafkaClient + open + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1692600024256 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..591a836 --- /dev/null +++ b/config.ini @@ -0,0 +1,23 @@ +[database] +;数据库地址 +host=node-01 +;端口 +port=3306 +;用户名 +username=root +;密码 +password=bw@2025 +;数据库 +db=analyze + +[zookeeper] +;zk地址 +zkhost=node-01:12181,node-02:12181,node-03:12181 +;节点 +node=/analyze + +[kafka] +;服务器地址 +bootstrap_servers=node-01:19092,node-02:19092,node-03:19092 +;topic +topic=produce_analyze diff --git a/environment.txt b/environment.txt new file mode 100644 index 0000000..7a1f644 --- /dev/null +++ b/environment.txt @@ -0,0 +1,3 @@ +1.python>3.7 +2.pandas=1.4.4 +3.sklearn=0.24.2 \ No newline at end of file diff --git a/log_util/__pycache__/set_logger.cpython-36.pyc b/log_util/__pycache__/set_logger.cpython-36.pyc new file mode 100644 index 0000000..2961010 Binary files /dev/null and b/log_util/__pycache__/set_logger.cpython-36.pyc differ diff --git a/log_util/__pycache__/set_logger.cpython-38.pyc b/log_util/__pycache__/set_logger.cpython-38.pyc new file mode 100644 index 0000000..8a628ab Binary files /dev/null and b/log_util/__pycache__/set_logger.cpython-38.pyc differ diff --git a/log_util/set_logger.py b/log_util/set_logger.py new file mode 100644 index 0000000..e735461 --- /dev/null +++ b/log_util/set_logger.py @@ -0,0 +1,33 @@ +#coding:utf8 +import logging +import os +import sys +from logging.handlers import TimedRotatingFileHandler +import re +# cur_dir = os.path.dirname( os.path.abspath(__file__)) or os.getcwd() +# sys.path.append(cur_dir + '/log_util') +def set_logger(filename): + # 创建logger对象。传入logger名字 + logger = logging.getLogger(filename) + # log_path = os.path.join(cur_dir, filename) + # 设置日志记录等级 + logger.setLevel(logging.INFO) + # interval 滚动周期, + # when="MIDNIGHT", interval=1 表示每天0点为更新点,每天生成一个文件 + # backupCount 表示日志保存个数 + file_handler = TimedRotatingFileHandler( + filename=filename, when="MIDNIGHT",encoding="utf-8", interval=1, backupCount=3 + ) + # filename="mylog" suffix设置,会生成文件名为mylog.2020-02-25.log + file_handler.suffix = "%Y-%m-%d.log" + # extMatch是编译好正则表达式,用于匹配日志文件名后缀 + # 需要注意的是suffix和extMatch一定要匹配的上,如果不匹配,过期日志不会被删除。 + file_handler.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}.log$") + # 定义日志输出格式 + file_handler.setFormatter( + logging.Formatter( + "[%(asctime)s] [%(process)d] [%(levelname)s] - %(module)s.%(funcName)s (%(filename)s:%(lineno)d) - %(message)s" + ) + ) + logger.addHandler(file_handler) + return logger diff --git a/logs/results.log b/logs/results.log new file mode 100644 index 0000000..e69de29 diff --git a/manage.py b/manage.py new file mode 100644 index 0000000..ea99051 --- /dev/null +++ b/manage.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +import os +import sys +import threading +from text_analysis.views import predict +import django + +if __name__ == "__main__": + t = threading.Thread(target=predict, name='predict') + t.daemon = True + t.start() + + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "text_analysis.settings") + django.setup() + from django.core.management import execute_from_command_line + execute_from_command_line(sys.argv) + + diff --git a/src.py b/src.py new file mode 100644 index 0000000..f799490 --- /dev/null +++ b/src.py @@ -0,0 +1,35 @@ +#coding:utf8 +import requests + +def upload(): + url="https://realtime.pdeepmatrix.com/apis/media/analysis/upload" + # 定义form-data参数 + data = { + 'fromLanguage': 'zh' + } + # 定义文件参数 + files = { + 'file': open('inputdata/lKTZNen6aak.mp4', 'rb') + } + response = requests.post(url, data=data, files=files) + print(response.text) + + #结果—{"code":200,"message":"SUCCESS","data":"3a42ea9594b641c39e40d1497ca29be9"} + +def getResults(): + url="https://realtime.pdeepmatrix.com/apis/media/analysis/getResult" + # 定义参数 + #'taskId': '3a42ea9594b641c39e40d1497ca29be9' + params = { + 'taskId': '5ee948446ab64d5d8a1d92ecfa6c2c93' + } + response = requests.get(url, params=params) + # 打印响应结果 + print(response.text) + #{"code":200,"message":"SUCCESS","data":{"sentences":[{"silence_duration":0,"end_time":5108,"speech_rate":150,"begin_time":1130,"channel_id":0,"emotion_value":"5.0","text":"视频解析、语音识别。"}]... +# upload() +getResults() + + + + diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..b2da032 --- /dev/null +++ b/start.sh @@ -0,0 +1 @@ +../../environment/python3.8/bin/uwsgi --ini uwsgi.ini --file wsgi.py --daemonize wsgi.log \ No newline at end of file diff --git a/stop_uwsgi.sh b/stop_uwsgi.sh new file mode 100644 index 0000000..ea46313 --- /dev/null +++ b/stop_uwsgi.sh @@ -0,0 +1 @@ +lsof -i:9020 |grep -v 'PID' | awk '{print $2}'| xargs kill -9 diff --git a/test.py b/test.py new file mode 100644 index 0000000..4398de2 --- /dev/null +++ b/test.py @@ -0,0 +1,103 @@ +#coding=utf8 +import sys +import requests +import json +import time + +# #url = 'http://0.0.0.0:5033' +# """ +# url = 'http://20.0.2.6:5055/classify_event' +# url = 'http://20.0.2.6:5055/is_about_china' +# url = 'http://20.0.2.6:5055/associated_words' +# """ +# url = 'http://127.0.0.1:9008/paper' +# +# # url_file ="http://172.18.1.130:9985/group33/default/20230415/09/15/1/“GF-1”影像质量评价及矿区土地利用分类潜力研究_陈明.docx" +# url_file="/opt/Project_kongtianyuan/inputfile/" +# filename = "“GF-1”影像质量评价及矿区土地利用分类潜力研究" +# +# data = {"url":url_file,"filename":filename} +# data_str = json.dumps(data) +# +# r = requests.post(url,data=str(data_str)) +# print(r.text) +# # res =json.loads(r.text) +# # print(res) +raw_data={ + "metadata":{ + "address":"http://172.24.12.126:9013/ASR/", + "index":0, + "admin":{ + "datasource":"2_任务提取" + }, + "output":{ + "output_type":"table", + "label_col":[ + "ASR识别内容" + ] + }, + "input":{ + "input_type":"text", + "label":[ + "2_任务提取" + ] + }, + "user":{ + "tag":"" + } + }, + "data":{ + "1_文件上传":"{\"fileId\":\"53aa330b4e484c9bdeb7ff35e335a6f6\",\"fileName\":\"lKTZNen6aak.mp4\",\"filePath\":\"/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\",\"fileType\":\"mp4\",\"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\",\"ossPath\":\"/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\"}", + "businessKey":"19615b029da477fb", + "2_任务提取":"[{\"fileId\":\"53aa330b4e484c9bdeb7ff35e335a6f6\",\"fileName\":\"lKTZNen6aak.mp4\",\"filePath\":\"/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\",\"fileType\":\"mp4\",\"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\",\"ossPath\":\"/group33/default/20230828/15/48/1/lKTZNen6aak.mp4\"}]" + }, + "created":1691004265000, + "module":"ASR", + "start_tag":"false", + "multi_branch":0, + "last_edit":1693417201000, + "next_app_id":[ + { + "start_id":154, + "edge_id":75, + "end_id":155 + } + ], + "transfer_id":3, + "version":1, + "blueprint_id":4, + "scenes_id":5, + "scenario":{ + "dataloss":1, + "autoCommitTriggerLast":1, + "maxErrors":3, + "autoCommit":1, + "freshVariables":1 + }, + "wait_condition":[ + + ], + "scheduling":{ + "interval":-1, + "type":"single" + }, + "name":"ASR", + "businessKey":"19615b029da477fb", + "id":154, + "position":[ + 100, + 200 + ], + "describe":"ASR识别" +} +allFile = raw_data["data"]["2_任务提取"] +currentFile = eval(allFile) +print(currentFile) +print(type(currentFile)) +# filejson = json.loads(currentFile) +# file = currentFile["fileUrl"] +# fileName = currentFile["fileName"] + +# print(file) + + diff --git a/text_analysis/__init__.py b/text_analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/text_analysis/__pycache__/__init__.cpython-36.pyc b/text_analysis/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000..92820a9 Binary files /dev/null and b/text_analysis/__pycache__/__init__.cpython-36.pyc differ diff --git a/text_analysis/__pycache__/__init__.cpython-38.pyc b/text_analysis/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..c93f47b Binary files /dev/null and b/text_analysis/__pycache__/__init__.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/cusException.cpython-38.pyc b/text_analysis/__pycache__/cusException.cpython-38.pyc new file mode 100644 index 0000000..1cf0f8a Binary files /dev/null and b/text_analysis/__pycache__/cusException.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/read_config.cpython-38.pyc b/text_analysis/__pycache__/read_config.cpython-38.pyc new file mode 100644 index 0000000..9ce9ad4 Binary files /dev/null and b/text_analysis/__pycache__/read_config.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/settings.cpython-36.pyc b/text_analysis/__pycache__/settings.cpython-36.pyc new file mode 100644 index 0000000..ed23213 Binary files /dev/null and b/text_analysis/__pycache__/settings.cpython-36.pyc differ diff --git a/text_analysis/__pycache__/settings.cpython-38.pyc b/text_analysis/__pycache__/settings.cpython-38.pyc new file mode 100644 index 0000000..530aa8c Binary files /dev/null and b/text_analysis/__pycache__/settings.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/urls.cpython-36.pyc b/text_analysis/__pycache__/urls.cpython-36.pyc new file mode 100644 index 0000000..586db08 Binary files /dev/null and b/text_analysis/__pycache__/urls.cpython-36.pyc differ diff --git a/text_analysis/__pycache__/urls.cpython-38.pyc b/text_analysis/__pycache__/urls.cpython-38.pyc new file mode 100644 index 0000000..b8ccae2 Binary files /dev/null and b/text_analysis/__pycache__/urls.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/views.cpython-36.pyc b/text_analysis/__pycache__/views.cpython-36.pyc new file mode 100644 index 0000000..616804e Binary files /dev/null and b/text_analysis/__pycache__/views.cpython-36.pyc differ diff --git a/text_analysis/__pycache__/views.cpython-38.pyc b/text_analysis/__pycache__/views.cpython-38.pyc new file mode 100644 index 0000000..036b3d5 Binary files /dev/null and b/text_analysis/__pycache__/views.cpython-38.pyc differ diff --git a/text_analysis/__pycache__/wsgi.cpython-36.pyc b/text_analysis/__pycache__/wsgi.cpython-36.pyc new file mode 100644 index 0000000..715c571 Binary files /dev/null and b/text_analysis/__pycache__/wsgi.cpython-36.pyc differ diff --git a/text_analysis/__pycache__/wsgi.cpython-38.pyc b/text_analysis/__pycache__/wsgi.cpython-38.pyc new file mode 100644 index 0000000..8b2c132 Binary files /dev/null and b/text_analysis/__pycache__/wsgi.cpython-38.pyc differ diff --git a/text_analysis/bak/views.py0928 b/text_analysis/bak/views.py0928 new file mode 100644 index 0000000..1363c0b --- /dev/null +++ b/text_analysis/bak/views.py0928 @@ -0,0 +1,207 @@ +#coding:utf8 +import os, sys +import io +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from django.http import HttpResponse +from text_analysis.tools import to_kafka +from django.views.decorators.csrf import csrf_exempt +from log_util.set_logger import set_logger +logging=set_logger('logs/results.log') +import traceback +import queue +import requests +import time +from datetime import datetime +import os +import joblib +#任务队列 +global task_queue +task_queue = queue.Queue() + + +@csrf_exempt +def robotIdentification(request): + if request.method == 'POST': + try: + raw_data = json.loads(request.body) + task_queue.put(raw_data) + return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) + except: + logging.error(traceback.format_exc()) + return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) + else: + return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) + +def predict(): + while True: + if task_queue.qsize() >0: + try: + logging.info("取任务队列长度{}".format(task_queue.qsize())) + raw_data = task_queue.get() + logging.info("原始数据-{}".format(raw_data)) + # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + # "fansCount": 308, "likeCount": 92707, "postCount": 14237, + # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + # "authentication": 0}, + # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} + # 用户数据 + res = {"successCode": "1", "errorLog": "", "results": {}} + #获取历史数据源 + all_result = raw_data['data'] + user_data = [] + data=raw_data["metadata"]["admin"] + #{"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + data['user_file'] = user_file_result['resultList'][0] + logging.info('用户数据:{}'.format(data['user_file'])) + else: + data['user_file'] ={} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + logging.info('帖子数据:{}'.format(data['post_file'])) + else: + data['post_file'] = {} + #识别结果返回值 + recognition_code = "0" + try: + user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1 = 0 + try: + user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2 = 0 + try: + user_data_fansCount_3 = int(data["user_file"]["fansCount"]) + except: + user_data_fansCount_3 = 0 + try: + user_data_likeCount_4 = int(data["user_file"]["likeCount"]) + except: + user_data_likeCount_4 = 0 + try: + user_data_postCount_5 = int(data["user_file"]["postCount"]) + except: + user_data_postCount_5 = 0 + try: + user_data_authentication_6 = int(data["user_file"]["authentication"]) + except: + user_data_authentication_6 = 0 + user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) + # 帖子数据 + if data["post_file"]=={}: + recognition_code = "-1" + else: + post_data = [] + try: + post_data_count_1 = int(data["post_file"]["count"]) + except: + post_data_count_1 = 0 + try: + post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2 = 0 + try: + post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3 = 0 + try: + post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4 = 0 + try: + post_data_length_5 = int(data["post_file"]["length"]) + except: + post_data_length_5 = 0 + try: + post_data_tags_6 = int(data["post_file"]["tags"]) + except: + post_data_tags_6 = 0 + try: + post_data_https_7 = int(data["post_file"]["https"]) + except: + post_data_https_7 = 0 + try: + post_data_at_8 = int(data["post_file"]["at"]) + except: + post_data_at_8 = 0 + try: + post_data_diffdate_9 = int(data["post_file"]["diffdate"]) + except: + post_data_diffdate_9 = 0 + post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) + features = [user_data + post_data] + bot_user = joblib.load(cur_dir+"/model/bot_user.pkl") # 加载训练好的模型 + result = bot_user.predict(features) + recognition_code = str(result[0]) + # logging.info("预测模型结果为{}".format(result)) + results = {} + # 用户id + results['accountId'] = data["user_file"]["accountId"] + # 用户昵称 + results['nickName'] = data["user_file"]["nickName"] + # 用户账号 + results['accountName'] = data["user_file"]["accountName"] + if recognition_code == '0': + results['recognitionResult'] = '非机器人' + results['recognitionCode'] = recognition_code + elif recognition_code == '1': + results['recognitionResult'] = '机器人' + results['recognitionCode'] = recognition_code + else: + results['recognitionResult'] = '未知识别结果' + results['recognitionCode'] = recognition_code + res['results'] = json.dumps(results) + raw_data["result"] = res + # raw_data_json=json.dumps(raw_data) + logging.info("增加预测数据-{}".format(raw_data)) + to_kafka.send_kafka(raw_data, logging) + except: + res = {"successCode": "0", "errorLog": "", "results": {}} + raw_data["result"] = res + raw_data["result"]["error"] = traceback.format_exc() + # raw_data_json=json.dumps(raw_data) + logging.info(traceback.format_exc()) + to_kafka.send_kafka(raw_data, logging) + else: + #暂无任务,进入休眠 + time.sleep(10) + + +if __name__ == '__main__': + all_result = {"9_获取用户发帖信息":"{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}","8_获取用户信息":"{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} + data={} + #{"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + resultList = user_file_result['resultList'] + data['user_file'] = resultList[0] + else: + data['user_file'] ={} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + else: + data['post_file'] = {} + + + print(data) + + + + + + + diff --git a/text_analysis/bak/views.py_0226 b/text_analysis/bak/views.py_0226 new file mode 100644 index 0000000..f006e7c --- /dev/null +++ b/text_analysis/bak/views.py_0226 @@ -0,0 +1,327 @@ +# coding:utf8 +import os, sys +import io + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from django.http import HttpResponse +from text_analysis.tools import to_kafka +from django.views.decorators.csrf import csrf_exempt +from log_util.set_logger import set_logger +from text_analysis.tools.tool import parse_data +logging = set_logger('logs/results.log') +import traceback +import queue +import requests +import time +from datetime import datetime +import os +import joblib +from text_analysis.cusException import userFile_Exception, postFile_Exception + +# 任务队列 +global task_queue +task_queue = queue.Queue() + +#mysql连接池 +from text_analysis.tools.db_pool import get_conn_pool + +@csrf_exempt +def robotIdentification(request): + if request.method == 'POST': + try: + raw_data = json.loads(request.body) + task_queue.put(raw_data) + return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) + except: + logging.error(traceback.format_exc()) + return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) + else: + return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) + + +def predict(user_file_result,post_file_result,task): + try: + # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + # "fansCount": 308, "likeCount": 92707, "postCount": 14237, + # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + # "authentication": 0}, + # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} + # 用户数据 + res = {"successCode": "1", "errorLog": "", "results": {}} + user_data = [] + # 返回值需要的三个字段 + accountId = "" + nickName = "" + accountName = "" + data = {} + if user_file_result: + data['user_file'] = user_file_result + logging.info('用户数据:{}'.format(data['user_file'])) + accountId = data["user_file"]["accountId"] + nickName = data["user_file"]["nickName"] + accountName = data["user_file"]["accountName"] + else: + data['user_file'] = {} + raise userFile_Exception + if post_file_result: + data['post_file'] = post_file_result + logging.info('帖子数据:{}'.format(data['post_file'])) + else: + data['post_file'] = {} + raise postFile_Exception + # 识别结果返回值 + recognition_code = "0" + try: + user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1 = 0 + try: + user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2 = 0 + try: + user_data_fansCount_3 = int(data["user_file"]["fansCount"]) + except: + user_data_fansCount_3 = 0 + try: + user_data_likeCount_4 = int(data["user_file"]["likeCount"]) + except: + user_data_likeCount_4 = 0 + try: + user_data_postCount_5 = int(data["user_file"]["postCount"]) + except: + user_data_postCount_5 = 0 + try: + user_data_authentication_6 = int(data["user_file"]["authentication"]) + except: + user_data_authentication_6 = 0 + user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) + # 帖子数据 + if data["post_file"] == {}: + recognition_code = "-1" + else: + post_data = [] + try: + post_data_count_1 = int(data["post_file"]["count"]) + except: + post_data_count_1 = 0 + try: + post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2 = 0 + try: + post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3 = 0 + try: + post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4 = 0 + try: + post_data_length_5 = int(data["post_file"]["length"]) + except: + post_data_length_5 = 0 + try: + post_data_tags_6 = int(data["post_file"]["tags"]) + except: + post_data_tags_6 = 0 + try: + post_data_https_7 = int(data["post_file"]["https"]) + except: + post_data_https_7 = 0 + try: + post_data_at_8 = int(data["post_file"]["at"]) + except: + post_data_at_8 = 0 + try: + post_data_diffdate_9 = int(data["post_file"]["diffdate"]) + except: + post_data_diffdate_9 = 0 + post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) + features = [user_data + post_data] + bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 + result = bot_user.predict(features) + recognition_code = str(result[0]) + # logging.info("预测模型结果为{}".format(result)) + results = {} + # 用户id + results['authorId'] = accountId + # 用户昵称 + results['nickName'] = nickName + # 用户账号 + results['accountName'] = accountName + #结束标识 + res['isLast'] = True + #数据类型 --目前只提供给图谱使用 + results['pageType'] = 'userAuthenPage' + if recognition_code == '0': + results['recognitionResult'] = '非机器人' + results['recognitionCode'] = recognition_code + elif recognition_code == '1': + results['recognitionResult'] = '机器人' + results['recognitionCode'] = recognition_code + else: + results['recognitionResult'] = '未知识别结果' + results['recognitionCode'] = recognition_code + res['results'] = json.dumps(results) + task["result"] = res + logging.info("增加预测数据-{}".format(task)) + to_kafka.send_kafka(task, logging) + except userFile_Exception: + res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} + results = {} + results['authorId'] = "" + results['nickName'] = "" + results['accountName'] = "" + results['recognitionResult'] = '用户数据为空' + res['results'] = json.dumps(results) + task["result"] = res + logging.info("该条请求用户数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except postFile_Exception: + res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = '帖子数据为空' + res['results'] = json.dumps(results) + task["result"] = res + logging.info("该条请求帖子数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except: + res = {"successCode": "0", "errorLog": "", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = "" + res["results"] = json.dumps(results) + task["result"] = res + task["result"]["error"] = traceback.format_exc() + logging.info(traceback.format_exc()) + to_kafka.send_kafka(task, logging) + +def data_structure(dbConfig): + ''' + 水军识别数据构造 + 主要是写入用户表、主贴表 + ''' + #获取数据库连接 + sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) + #用户任务作为响应体 + send_task = None + while True: + if task_queue.qsize() > 0: + try: + logging.info("队列长度:{}".format(task_queue.qsize())) + task = task_queue.get() + input = task['input'] + account = input['account'] + post = input['post'] + #判断数据类型 + data = task['data'] + page_type = None + taskId = None + for data_str in data: + try: + app_data = json.loads(data[data_str]) + taskId = app_data['taskId'] + if "pageType" in app_data: + page_type = app_data['pageType'] + break + except: + logging.error("正常判断,异常请忽略") + logging.info("数据类型:{}".format(page_type)) + if page_type == 'userInfoPage': + # 用户任务作为响应体 + send_task = task + #用户类型数据写入 + sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, account['taskId']), + parse_data(task, account['accountId']), + parse_data(task, account['accountName']), + parse_data(task, account['nickName']), + parse_data(task, account['fansCount']), + parse_data(task, account['likeCount']), + parse_data(task, account['postCount']), + parse_data(task, account['otherInfo']), + parse_data(task, account['authentication']) + ) + sqlhelper.insert(sql,values) + + elif page_type == 'storyDetailPage': + #帖子类型数据写入 + # 定义 SQL 语句 + sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, post['taskId']), + parse_data(task, post['postId']), + parse_data(task, post['accountId']), + parse_data(task, post['accountName']), + parse_data(task, post['likeCount']), + parse_data(task, post['emotionCount']), + parse_data(task, post['commentsCount']), + parse_data(task, post['shareCount']), + parse_data(task, post['content']), + parse_data(task, post['pubTime']), + parse_data(task, post['crawlTime']) + ) + sqlhelper.insert(sql,values) + #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 + if 'isLast'in data and data['isLast']: + #获取用户相关的数据 + sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) + user_file_result = sqlhelper.queryOne(sql) + # 获取帖子相关的数据 + sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId) + post_file_result = sqlhelper.queryOne(sql) + if send_task == None: + send_task = task + predict(user_file_result,post_file_result,send_task) + #结束置空 + send_task = None + except Exception as e: + traceback.print_exc() + else: + # 暂无任务,进入休眠 + time.sleep(10) +if __name__ == '__main__': + all_result = { + "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}", + "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} + data = {} + # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + resultList = user_file_result['resultList'] + data['user_file'] = resultList[0] + else: + data['user_file'] = {} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + else: + data['post_file'] = {} + + print(data) + + + + + + + diff --git a/text_analysis/bak/views.py_0607 b/text_analysis/bak/views.py_0607 new file mode 100644 index 0000000..b6b0e96 --- /dev/null +++ b/text_analysis/bak/views.py_0607 @@ -0,0 +1,335 @@ +# coding:utf8 +import os, sys +import io + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from django.http import HttpResponse +from text_analysis.tools import to_kafka +from django.views.decorators.csrf import csrf_exempt +from log_util.set_logger import set_logger +from text_analysis.tools.tool import parse_data +logging = set_logger('logs/results.log') +import traceback +import queue +import requests +import time +from datetime import datetime +import os +import joblib +from text_analysis.cusException import userFile_Exception, postFile_Exception + +# 任务队列 +global task_queue +task_queue = queue.Queue() + +#mysql连接池 +from text_analysis.tools.db_pool import get_conn_pool + +@csrf_exempt +def robotIdentification(request): + if request.method == 'POST': + try: + raw_data = json.loads(request.body) + task_queue.put(raw_data) + return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) + except: + logging.error(traceback.format_exc()) + return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) + else: + return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) + + +def predict(user_file_result,post_file_result,task): + try: + # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + # "fansCount": 308, "likeCount": 92707, "postCount": 14237, + # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + # "authentication": 0}, + # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} + # 用户数据 + res = {"successCode": "1", "errorLog": "", "results": {}} + user_data = [] + # 返回值需要的三个字段 + accountId = "" + nickName = "" + accountName = "" + data = {} + if user_file_result: + data['user_file'] = user_file_result + logging.info('用户数据:{}'.format(data['user_file'])) + accountId = data["user_file"]["accountId"] + nickName = data["user_file"]["nickName"] + accountName = data["user_file"]["accountName"] + else: + data['user_file'] = {} + raise userFile_Exception + if post_file_result: + data['post_file'] = post_file_result + logging.info('帖子数据:{}'.format(data['post_file'])) + else: + data['post_file'] = {} + raise postFile_Exception + # 识别结果返回值 + recognition_code = "0" + try: + user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1 = 0 + try: + user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2 = 0 + try: + user_data_fansCount_3 = int(data["user_file"]["fansCount"]) + except: + user_data_fansCount_3 = 0 + try: + user_data_likeCount_4 = int(data["user_file"]["likeCount"]) + except: + user_data_likeCount_4 = 0 + try: + user_data_postCount_5 = int(data["user_file"]["postCount"]) + except: + user_data_postCount_5 = 0 + try: + user_data_authentication_6 = int(data["user_file"]["authentication"]) + except: + user_data_authentication_6 = 0 + user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) + # 帖子数据 + if data["post_file"] == {}: + recognition_code = "-1" + else: + post_data = [] + try: + post_data_count_1 = int(data["post_file"]["count"]) + except: + post_data_count_1 = 0 + try: + post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2 = 0 + try: + post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3 = 0 + try: + post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4 = 0 + try: + post_data_length_5 = int(data["post_file"]["length"]) + except: + post_data_length_5 = 0 + try: + post_data_tags_6 = int(data["post_file"]["tags"]) + except: + post_data_tags_6 = 0 + try: + post_data_https_7 = int(data["post_file"]["https"]) + except: + post_data_https_7 = 0 + try: + post_data_at_8 = int(data["post_file"]["at"]) + except: + post_data_at_8 = 0 + try: + post_data_diffdate_9 = int(data["post_file"]["diffdate"]) + except: + post_data_diffdate_9 = 0 + post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) + features = [user_data + post_data] + bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 + result = bot_user.predict(features) + recognition_code = str(result[0]) + # logging.info("预测模型结果为{}".format(result)) + results = {} + # 用户id + results['authorId'] = accountId + # 用户昵称 + results['nickName'] = nickName + # 用户账号 + results['accountName'] = accountName + #结束标识 + res['isLast'] = True + #数据类型 --目前只提供给图谱使用 + results['pageType'] = 'userAuthenPage' + if recognition_code == '0': + results['recognitionResult'] = '非机器人' + results['recognitionCode'] = recognition_code + elif recognition_code == '1': + results['recognitionResult'] = '机器人' + results['recognitionCode'] = recognition_code + else: + results['recognitionResult'] = '未知识别结果' + results['recognitionCode'] = recognition_code + res['results'] = json.dumps(results) + res["status"]=1 + res["message"]="成功" + task["result"] = res + logging.info("增加预测数据-{}".format(task)) + to_kafka.send_kafka(task, logging) + except userFile_Exception: + res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} + results = {} + results['authorId'] = "" + results['nickName'] = "" + results['accountName'] = "" + results['recognitionResult'] = '用户数据为空' + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="用户数据为空" + task["result"] = res + logging.info("该条请求用户数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except postFile_Exception: + res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = '帖子数据为空' + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="帖子数据为空" + task["result"] = res + logging.info("该条请求帖子数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except: + res = {"successCode": "0", "errorLog": "", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = "" + res["results"] = json.dumps(results) + res["status"]=2 + res["message"]="异常" + task["result"] = res + task["result"]["error"] = traceback.format_exc() + logging.info(traceback.format_exc()) + to_kafka.send_kafka(task, logging) + +def data_structure(dbConfig): + ''' + 水军识别数据构造 + 主要是写入用户表、主贴表 + ''' + #获取数据库连接 + sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) + #用户任务作为响应体 + send_task = None + while True: + if task_queue.qsize() > 0: + try: + logging.info("队列长度:{}".format(task_queue.qsize())) + task = task_queue.get() + input = task['input'] + account = input['account'] + post = input['post'] + #判断数据类型 + data = task['data'] + page_type = None + taskId = None + for data_str in data: + try: + app_data = json.loads(data[data_str]) + taskId = app_data['taskId'] + if "pageType" in app_data: + page_type = app_data['pageType'] + break + except: + logging.error("正常判断,异常请忽略") + logging.info("数据类型:{}".format(page_type)) + if page_type == 'userInfoPage': + # 用户任务作为响应体 + send_task = task + #用户类型数据写入 + sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, account['taskId']), + parse_data(task, account['accountId']), + parse_data(task, account['accountName']), + parse_data(task, account['nickName']), + parse_data(task, account['fansCount']), + parse_data(task, account['likeCount']), + parse_data(task, account['postCount']), + parse_data(task, account['otherInfo']), + parse_data(task, account['authentication']) + ) + sqlhelper.insert(sql,values) + + elif page_type == 'storyDetailPage': + #帖子类型数据写入 + # 定义 SQL 语句 + sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, post['taskId']), + parse_data(task, post['postId']), + parse_data(task, post['accountId']), + parse_data(task, post['accountName']), + parse_data(task, post['likeCount']), + parse_data(task, post['emotionCount']), + parse_data(task, post['commentsCount']), + parse_data(task, post['shareCount']), + parse_data(task, post['content']), + parse_data(task, post['pubTime']), + parse_data(task, post['crawlTime']) + ) + sqlhelper.insert(sql,values) + #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 + if 'isLast'in data and data['isLast']: + #获取用户相关的数据 + sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) + user_file_result = sqlhelper.queryOne(sql) + # 获取帖子相关的数据 + sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId) + post_file_result = sqlhelper.queryOne(sql) + if send_task == None: + send_task = task + predict(user_file_result,post_file_result,send_task) + #结束置空 + send_task = None + except Exception as e: + traceback.print_exc() + else: + # 暂无任务,进入休眠 + time.sleep(10) +if __name__ == '__main__': + all_result = { + "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}", + "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} + data = {} + # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + resultList = user_file_result['resultList'] + data['user_file'] = resultList[0] + else: + data['user_file'] = {} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + else: + data['post_file'] = {} + + print(data) + + + + + + + diff --git a/text_analysis/cusException.py b/text_analysis/cusException.py new file mode 100644 index 0000000..f157618 --- /dev/null +++ b/text_analysis/cusException.py @@ -0,0 +1,9 @@ +# -*- coding:utf-8 -*- + +class userFile_Exception(Exception): + def __str__(self): + return '用户数据为空' + +class postFile_Exception(Exception): + def __str__(self): + return '帖子数据为空' \ No newline at end of file diff --git a/text_analysis/linshi.py b/text_analysis/linshi.py new file mode 100644 index 0000000..d4e4b90 --- /dev/null +++ b/text_analysis/linshi.py @@ -0,0 +1,101 @@ +#coding:utf8 +import joblib +import json +import os +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +import numpy as np +class MyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + +raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + "fansCount": 308, "likeCount": 92707, "postCount": 14237, + "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + "authentication": 0}, + "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} +# 用户数据 +res = {"successCode": "1", "errorLog": "", "results": {}} +user_data = [] +try: + user_data_otherInfo_1 = 0 if raw_data["user_file"]["otherInfo"].strip() == "" else 1 +except: + user_data_otherInfo_1 = 0 +try: + user_data_nickName_2 = 0 if raw_data["user_file"]["nickName"].strip() == "" else 1 +except: + user_data_nickName_2 = 0 +try: + user_data_fansCount_3 = int(raw_data["user_file"]["fansCount"]) +except: + user_data_fansCount_3 = 0 +try: + user_data_likeCount_4 = int(raw_data["user_file"]["likeCount"]) +except: + user_data_likeCount_4 = 0 +try: + user_data_postCount_5 = int(raw_data["user_file"]["postCount"]) +except: + user_data_postCount_5 = 0 +try: + user_data_authentication_6 = int(raw_data["user_file"]["authentication"]) +except: + user_data_authentication_6 = 0 +user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) +# 帖子数据 +post_data = [] +try: + post_data_count_1 = int(raw_data["post_file"]["count"]) +except: + post_data_count_1 = 0 +try: + post_data_LikeCount_2 = int(raw_data["post_file"]["LikeCount"]) +except: + post_data_LikeCount_2 = 0 +try: + post_data_CommentsCount_3 = int(raw_data["post_file"]["CommentsCount"]) +except: + post_data_CommentsCount_3 = 0 +try: + post_data_ShareCount_4 = int(raw_data["post_file"]["ShareCount"]) +except: + post_data_ShareCount_4 = 0 +try: + post_data_length_5 = int(raw_data["post_file"]["length"]) +except: + post_data_length_5 = 0 +try: + post_data_tags_6 = int(raw_data["post_file"]["tags"]) +except: + post_data_tags_6 = 0 +try: + post_data_https_7 = int(raw_data["post_file"]["https"]) +except: + post_data_https_7 = 0 +try: + post_data_at_8 = int(raw_data["post_file"]["at"]) +except: + post_data_at_8 = 0 +try: + post_data_diffdate_9 = int(raw_data["post_file"]["diffdate"]) +except: + post_data_diffdate_9 = 0 +post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) +features = [user_data + post_data] +print(cur_dir + "/model/bot_user.pkl") +bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 +result = bot_user.predict(features) +res["results"] = result[0] +# logging.info("预测模型结果为{}".format(result)) +raw_data["result"] = res +# print(raw_data) +print(raw_data) \ No newline at end of file diff --git a/text_analysis/model/bot_user.pkl b/text_analysis/model/bot_user.pkl new file mode 100644 index 0000000..419f983 Binary files /dev/null and b/text_analysis/model/bot_user.pkl differ diff --git a/text_analysis/read_config.py b/text_analysis/read_config.py new file mode 100644 index 0000000..2343691 --- /dev/null +++ b/text_analysis/read_config.py @@ -0,0 +1,10 @@ +import configparser + +#加载配置文件 +def load_config(): + configFile = './config.ini' + # 创建配置文件对象 + con = configparser.ConfigParser() + # 读取文件 + con.read(configFile, encoding='utf-8') + return con diff --git a/text_analysis/request.py b/text_analysis/request.py new file mode 100644 index 0000000..322ccc2 --- /dev/null +++ b/text_analysis/request.py @@ -0,0 +1,14 @@ +#coding:utf8 +# import leida_ner_bert_crf + +import requests + +url = "http://172.18.1.166:9000/leidaduikang" + +payload = "{\"inputUrl\":\"/home/bfdadmin/leidabert/Project_leidaduikang/AInputdata/content_100.xlsx\"}" +headers = {'user-agent': "vscode-restclient",'header name': "header value"} + +response = requests.request("POST", url, timeout=1000000,data=payload, headers=headers) + +print(response.text) + diff --git a/text_analysis/settings.py b/text_analysis/settings.py new file mode 100644 index 0000000..6677a8e --- /dev/null +++ b/text_analysis/settings.py @@ -0,0 +1,148 @@ +""" +Django settings for Zhijian_Project_WebService project. + +Generated by 'django-admin startproject' using Django 1.8. + +For more information on this file, see +https://docs.djangoproject.com/en/1.8/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/1.8/ref/settings/ +""" + +# Build paths inside the project like this: os.path.join(BASE_DIR, ...) +import os + +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/1.8/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = '330r)_!^qhd7$!w4)$y@4=p2bd*vlxf%4z(bx-fx-1i3txagvz' + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = ['*'] + + +# Application definition + +INSTALLED_APPS = ( + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', +) + +MIDDLEWARE = [ + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + # 'django.contrib.auth.middleware.SessionAuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', + 'django.middleware.security.SecurityMiddleware', +] + +ROOT_URLCONF = 'text_analysis.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.debug', + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'text_analysis.wsgi.application' + + +# Database +# https://docs.djangoproject.com/en/1.8/ref/settings/#databases + +# DATABASES = { +# 'default': { +# 'ENGINE': 'django.db.backends.sqlite3', +# 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), +# } +# } + + +# Internationalization +# https://docs.djangoproject.com/en/1.8/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'Asia/Shanghai' + +USE_I18N = True + +USE_L10N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/1.8/howto/static-files/ + +STATIC_URL = '/static/' + +# U_LOGFILE_SIZE = 1 * 1024 * 1024 # 单日志文件最大100M +# U_LOGFILE_COUNT = 7 # 保留10个日志文件 +# +# LOGGING = { +# 'version': 1, +# 'disable_existing_loggers': True, # 禁用所有已经存在的日志配置 +# 'filters': { +# 'require_debug_false': { +# '()': 'django.utils.log.RequireDebugFalse' +# } +# }, +# 'formatters': { +# 'verbose': { +# 'format': '[%(levelname)s %(asctime)s @ %(process)d] %(module)s %(process)d %(thread)d %(message)s' +# }, +# 'simple': { +# 'format': '%(levelname)s %(asctime)s @ %(process)d %(message)s' +# }, +# 'complete': { +# 'format': '[%(levelname)s %(asctime)s @ %(process)d] (%(pathname)s/%(funcName)s:%(lineno)d) - %(message)s' +# }, +# 'online': { +# 'format': '[%(levelname)s %(asctime)s @ %(process)d] - %(message)s' +# } +# }, +# 'handlers': { +# 'text': { +# 'level': 'DEBUG', +# #'class': 'logging.handlers.RotatingFileHandler', +# 'class': 'logging.handlers.TimedRotatingFileHandler', +# 'when': 'H', +# 'interval': 1, +# 'backupCount': U_LOGFILE_COUNT, +# 'formatter': 'complete', +# 'filename': os.path.join(BASE_DIR, 'logs/resultNew.log').replace('\\', '/'), +# } +# }, +# 'loggers': { +# 'text': { +# 'handlers': ['text'], +# 'level': 'DEBUG', +# 'propagate': False, +# } +# } +# } diff --git a/text_analysis/src.py b/text_analysis/src.py new file mode 100644 index 0000000..9bdd212 --- /dev/null +++ b/text_analysis/src.py @@ -0,0 +1,90 @@ +#coding:utf8 +import joblib +#accountName:johnsonleung +def predict(): + raw_data = {"user_file":{"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", "fansCount": 308,"likeCount": 92707,"postCount": 14237, "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", "authentication": 0}, + "post_file":{"count":1,"LikeCount":12,"CommentsCount":1,"ShareCount":1,"length":150,"tags":0,"https":0,"at":0,"diffdate":1}} + ''' + 需要计算的入参 + 1.count:帖子总数量 + 2.LikeCount:帖子点赞数的平均值 + 3.CommentsCount:帖子评论数的平均值 + 4.ShareCount:帖子分享数的平均值 + 5.length:帖子文本长度的平均值 + 6.tags:帖子文本中包含“#”数量的平均值 + 7.https:帖子文本中包含“https”数量的平均值 + 8.at:帖子文本中包含“@”数量的平均值 + 9.diffdate:全部帖子的最小值(帖子A发表时间和抓取时间的最大值-A的发表时间) + ''' + #用户数据 + user_data=[] + try: + user_data_otherInfo_1 = 0 if raw_data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1=0 + try: + user_data_nickName_2 = 0 if raw_data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2=0 + try: + user_data_fansCount_3 = int(raw_data["user_file"]["fansCount"]) + except: + user_data_fansCount_3=0 + try: + user_data_likeCount_4=int(raw_data["user_file"]["likeCount"]) + except: + user_data_likeCount_4=0 + try: + user_data_postCount_5=int(raw_data["user_file"]["postCount"]) + except: + user_data_postCount_5=0 + try: + user_data_authentication_6=int(raw_data["user_file"]["authentication"]) + except: + user_data_authentication_6=0 + user_data.extend([user_data_otherInfo_1,user_data_nickName_2,user_data_fansCount_3,user_data_likeCount_4,user_data_postCount_5,user_data_authentication_6]) + #帖子数据 + post_data=[] + try: + post_data_count_1 = int(raw_data["post_file"]["count"]) + except: + post_data_count_1=0 + try: + post_data_LikeCount_2 = int(raw_data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2=0 + try: + post_data_CommentsCount_3 = int(raw_data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3=0 + try: + post_data_ShareCount_4 = int(raw_data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4=0 + try: + post_data_length_5 = int(raw_data["post_file"]["length"]) + except: + post_data_length_5=0 + try: + post_data_tags_6 = int(raw_data["post_file"]["tags"]) + except: + post_data_tags_6=0 + try: + post_data_https_7 = int(raw_data["post_file"]["https"]) + except: + post_data_https_7=0 + try: + post_data_at_8 = int(raw_data["post_file"]["at"]) + except: + post_data_at_8=0 + try: + post_data_diffdate_9 = int(raw_data["post_file"]["diffdate"]) + except: + post_data_diffdate_9=0 + post_data.extend([post_data_count_1,post_data_LikeCount_2,post_data_CommentsCount_3,post_data_ShareCount_4,post_data_length_5,post_data_tags_6,post_data_https_7,post_data_at_8,post_data_diffdate_9]) + features=[user_data+post_data] + bot_user = joblib.load("model/bot_user.pkl") # 加载训练好的模型 + result=bot_user.predict(features) + print(result) + # 参数顺序[['otherInfo', 'nickName', 'fansCount', 'likeCount','postCount', 'authentication', 'count', 'LikeCount', 'CommentsCount', 'ShareCount','length', 'tags', 'https', 'at', 'diffdate']] +predict() diff --git a/text_analysis/tools/__pycache__/cusException.cpython-36.pyc b/text_analysis/tools/__pycache__/cusException.cpython-36.pyc new file mode 100644 index 0000000..6e2ea9d Binary files /dev/null and b/text_analysis/tools/__pycache__/cusException.cpython-36.pyc differ diff --git a/text_analysis/tools/__pycache__/db_pool.cpython-38.pyc b/text_analysis/tools/__pycache__/db_pool.cpython-38.pyc new file mode 100644 index 0000000..7209ac5 Binary files /dev/null and b/text_analysis/tools/__pycache__/db_pool.cpython-38.pyc differ diff --git a/text_analysis/tools/__pycache__/mysql_helper.cpython-36.pyc b/text_analysis/tools/__pycache__/mysql_helper.cpython-36.pyc new file mode 100644 index 0000000..c9f4217 Binary files /dev/null and b/text_analysis/tools/__pycache__/mysql_helper.cpython-36.pyc differ diff --git a/text_analysis/tools/__pycache__/process.cpython-36.pyc b/text_analysis/tools/__pycache__/process.cpython-36.pyc new file mode 100644 index 0000000..543ccd3 Binary files /dev/null and b/text_analysis/tools/__pycache__/process.cpython-36.pyc differ diff --git a/text_analysis/tools/__pycache__/to_kafka.cpython-36.pyc b/text_analysis/tools/__pycache__/to_kafka.cpython-36.pyc new file mode 100644 index 0000000..ffee3e3 Binary files /dev/null and b/text_analysis/tools/__pycache__/to_kafka.cpython-36.pyc differ diff --git a/text_analysis/tools/__pycache__/to_kafka.cpython-38.pyc b/text_analysis/tools/__pycache__/to_kafka.cpython-38.pyc new file mode 100644 index 0000000..6ec5793 Binary files /dev/null and b/text_analysis/tools/__pycache__/to_kafka.cpython-38.pyc differ diff --git a/text_analysis/tools/__pycache__/tool.cpython-36.pyc b/text_analysis/tools/__pycache__/tool.cpython-36.pyc new file mode 100644 index 0000000..72944b7 Binary files /dev/null and b/text_analysis/tools/__pycache__/tool.cpython-36.pyc differ diff --git a/text_analysis/tools/__pycache__/tool.cpython-38.pyc b/text_analysis/tools/__pycache__/tool.cpython-38.pyc new file mode 100644 index 0000000..f64dd05 Binary files /dev/null and b/text_analysis/tools/__pycache__/tool.cpython-38.pyc differ diff --git a/text_analysis/tools/__pycache__/tools.cpython-36.pyc b/text_analysis/tools/__pycache__/tools.cpython-36.pyc new file mode 100644 index 0000000..d09f422 Binary files /dev/null and b/text_analysis/tools/__pycache__/tools.cpython-36.pyc differ diff --git a/text_analysis/tools/cusException.py b/text_analysis/tools/cusException.py new file mode 100644 index 0000000..2a87b59 --- /dev/null +++ b/text_analysis/tools/cusException.py @@ -0,0 +1,25 @@ +# -*- coding:utf-8 -*- + +class pt_v_Exception(Exception): + def __str__(self): + return 'pt规则未在缓存中命中' + +class dt_v_Exception(Exception): + def __str__(self): + return 'dt规则未在缓存中命中' + +class dt_v_attr_Exception(Exception): + def __str__(self): + return 'dt_attrcode规则未在缓存中命中' + +class dt_v_codeid_Exception(Exception): + def __str__(self): + return 'dt_codeid规则未在缓存中命中' + +class dt_v_senti_Exception(Exception): + def __str__(self): + return 'dt_senti规则未在缓存中命中' + +class dt_v_res_Exception(Exception): + def __str__(self): + return 'dt_resverse规则未在缓存中命中' \ No newline at end of file diff --git a/text_analysis/tools/db_pool.py b/text_analysis/tools/db_pool.py new file mode 100644 index 0000000..81c5150 --- /dev/null +++ b/text_analysis/tools/db_pool.py @@ -0,0 +1,131 @@ +# coding=utf-8 +import time +import pymysql +from DBUtils.PooledDB import PooledDB,SharedDBConnection +import json +import datetime +import re +import traceback +class MySQLUtils(object): + def __init__(self, host, port, dbuser, password, database): + self.pool = PooledDB( + creator= pymysql, #使用连接数据库的模块 + maxconnections= 100, #连接池允许的最大连接数,0和None表示不限制连接数 + mincached= 10, #初始化时,链接池中至少创建的空闲的链接,0表示不创建 + maxcached= 100, #链接池中最多闲置的链接,0和None不限制 + maxshared=0,# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 + blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 + maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 + setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] + ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always + host=host, + port=int(port), + user=dbuser, + password=password, + database=database, + charset='utf8mb4' + ) + #获取连接 + def connectdb(self): + conn = self.pool.connection() + # 验证当前连接是否断开,如果断开重新连接 + conn.ping(reconnect=True) + cursor = conn.cursor(pymysql.cursors.DictCursor) + return conn,cursor + + ''' + 查询语句 + 返回全部内容 + ''' + def queryAll(self,sql): + conn,cursor = self.connectdb() + cursor.execute(sql) + results = cursor.fetchall() + conn.close() + return results + ''' + 查询语句 + 返回单条内容 + ''' + def queryOne(self,sql): + conn,cursor = self.connectdb() + cursor.execute(sql) + results = cursor.fetchone() + conn.close() + return results + ''' + 插入数据 + ''' + def insert(self,sql,values): + conn, cursor = self.connectdb() + try: + # 执行 SQL 语句 + cursor.execute(sql, values) + # 提交事务 + conn.commit() + except: + print('插入失败') + print('错误sql语句:%s' %sql) + traceback.print_exc() + conn.rollback() + finally: + conn.close() + ''' + 修改数据 + ''' + def update(self,sql): + conn, cursor = self.connectdb() + try: + cursor.execute(sql) + conn.commit() + except: + print('修改失败') + print('错误sql语句:%s' %sql) + print(traceback.print_exc()) + conn.rollback() + finally: + conn.close() + + ''' + 删除数据 + ''' + def delete(self,sql): + conn, cursor = self.connectdb() + try: + cursor.execute(sql) + conn.commit() + except: + print('删除失败') + print('错误sql语句:%s' %sql) + conn.rollback() + finally: + conn.close() + +def get_conn_pool(host,port,username,password,db): + sqlhelper = MySQLUtils(host, port, username, password, db) + return sqlhelper + +if __name__ == '__main__': + sqlhelper = MySQLUtils("172.26.11.110", 3306, "crawl", "crawl123", "kyyzgpt") + + # conn = sqlhelper.pool.connection() + # cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = 'select relation_id,start_id,end_id from relations where blueprint_id = 5' + print("sql:%s" %sql) + # cursor.execute(sql) + # results = cursor.fetchall() + results = sqlhelper.queryOne(sql) + print (json.dumps(results)) + # if results: + # print('有数据:{}'.format(len(results))) + # for item in results: + # + # if item['sign']=='user': + # p1 = r".*(?=/video)" + # pattern1 = re.compile(p1) + # matcher1 = re.search(pattern1, item['url']) + # # attr = {'brand':item['keyword'],'project_name':'208-A国'} + # attr = {'project_name':'208-A国'} + # sql = "insert into crawl_seed_task (pageTypeID,cid,task_url,attachTag,crawl_mode,crawl_cyclicity_minute,crawl_period_hour,last_crawl_time,next_crawl_time,createTime) values(61,'youtube','{}','{}',1,720,24,'2019-11-28 12:00:00','2019-11-29 00:00:00',NOW())".format(matcher1.group(),json.dumps(attr).encode('utf-8').decode('unicode_escape')) + # sqlhelper.insert(sql) + # print('sql:%s' %sql) \ No newline at end of file diff --git a/text_analysis/tools/kakfa_util.py b/text_analysis/tools/kakfa_util.py new file mode 100644 index 0000000..eaa6cf3 --- /dev/null +++ b/text_analysis/tools/kakfa_util.py @@ -0,0 +1,67 @@ +# coding=utf-8 +from kafka import KafkaProducer +from kafka import KafkaConsumer +import json +import traceback +import time +import traceback +import datetime +import queue +from logUtil import get_logger + +logger = get_logger("crawlWebsrcCode.log") +""" +写到kafka +""" +def kafkaProduce(topic,resultData,address): + producer = KafkaProducer(bootstrap_servers = '{}'.format(address),request_timeout_ms=120000) + topics = topic.split(',') + for tc in topics: + future = producer.send(tc,resultData) + result = future.get(timeout=60) + producer.flush() + print (result) + +#写入文件 +def writeTxt(filePath,result): + f = open(filePath,'a',encoding='utf-8') + f.write(result.encode('utf-8').decode('unicode_escape')+'\n') + f.close + +def KafkaConsume(topic,address,group_id,task_queue,logger): + ''' + 监控kafka,读取数据写到任务队列 + :param topic: + :param address: + :param group_id: + :param task_queue: + :return: + ''' + try: + consumer = KafkaConsumer(topic, auto_offset_reset='earliest',fetch_max_bytes=1024768000,fetch_max_wait_ms=5000, bootstrap_servers=address,group_id = group_id) + i = 1 + while True: + for msg in consumer: + print('第{}条数据'.format(i)) + data = str(msg.value, encoding = "utf-8") + print(data) + task_queue.put(data) + i = i+1 + else: + print('暂无任务------') + time.sleep(10) + except Exception as e: + print('kafka未知异常----') + traceback.print_exc() + +def writeTxt(filePath,result): + f = open(filePath,'a') + f.write(result+'\n') + f.close + +if __name__ == '__main__': + # resultData = {'id': '中文', 'url': 'https://zh.wikipedia.org/zh/%E8%94%A1%E8%8B%B1%E6%96%87'} + # kafkaProduce('test', json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),'121.4.41.194:8008') + task_queue = queue.Queue() + KafkaConsume('fq-Taobao-eccontent','39.129.129.172:6666,39.129.129.172:6668,39.129.129.172:6669,39.129.129.172:6670,39.129.129.172:6671','news_sche_8',task_queue,logger) + # KafkaConsume('zxbnewstopic','120.133.14.71:9992','group3',task_queue,logger) diff --git a/text_analysis/tools/mysql_helper.py b/text_analysis/tools/mysql_helper.py new file mode 100644 index 0000000..6118907 --- /dev/null +++ b/text_analysis/tools/mysql_helper.py @@ -0,0 +1,338 @@ +# coding:utf8 +import os, sys +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +import re +# from log_util.set_logger import set_logger +# logging = set_logger('logs/error.log') +import pymysql.cursors +import traceback + +def mysqlConn(data,logging): + res={"successCode":"1","errorLog":"","results":""} + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "SHOW TABLES" + cursor.execute(sql) + tables = cursor.fetchall() + if tables: + table_names = list(map(lambda x: list(x.values())[0], tables)) + res["results"] = table_names + else: + res["successCode"] = "0" + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +def getTableColumnNames(data,logging): + res={"successCode":"1","errorLog":"","results":""} + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + p_table=data["Table"] + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "DESCRIBE "+p_table + cursor.execute(sql) + tables = cursor.fetchall() + if tables: + table_names = list(map(lambda x: x['Field'], tables)) + res["results"] = table_names + else: + res["successCode"] = "0" + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +def mysqlInsert(input,logging): + res={"successCode":"1","errorLog":"","results":""} + data=input["metadata"]["admin"] + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + p_table=data["Table"] + p_columnName=data["columnName"] + cN='('+','.join(p_columnName)+') ' + p_values=data["values"] + val=tuple(p_values) + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "insert into " + p_table + cN + "values ("+ ','.join(['%s'] * len(val)) + ")" + cursor.execute(sql,val) + db.commit() + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +def mysqlUpdate(input,logging): + res={"successCode":"1","errorLog":"","results":""} + data=input["metadata"]["admin"] + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + p_table=data["Table"] + # p_set=data["Set"] + p_set=get_updateSet(input) + # where=process_where(data["Filter"]) + where=get_filter(data["Filter"]) + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "UPDATE " + p_table + p_set + where + print(sql) + cursor.execute(sql) + db.commit() + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +def mysqlExecute(input,logging): + res={"successCode":"1","errorLog":"","results":""} + data=input["metadata"]["admin"] + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + execute=data["Execute"] + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + cursor.execute(execute) + if 'select' in execute.lower(): + result = cursor.fetchall() + res["results"]=json.dumps(result,ensure_ascii=False) + else: + db.commit() + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +# def process_where(data): +# ''' +# 组装where +# :param data: data["Filter"],{"key":"age","value":"20","operator":">"},{"logicalSymbol":"and"},{"key":"weight","value":"50","operator":"<"} +# :return: WHERE age>20 and weight<50 +# ''' +# if data=="" or data==[]: +# return "" +# where = " WHERE " +# for line in data: +# if "key" in line.keys(): +# val = line["value"] +# if isinstance(val, str): +# val = "\'" + val + "\'" +# tmp = str(line["key"]) + " " + line["operator"] + " " + str(val) +# where += tmp +# else: +# where += " " + line["logicalSymbol"] + " " +# return where +# +# def process_filter(data): +# ''' +# 组装key,value,operator +# :param data: data["Filter"],{"key":"age",value:"20","operator":"="} +# :return: age=20 +# ''' +# if data=="" or data==[]: +# return "" +# res=data["key"]+" "+data["operator"]+" "+data["value"] +# return res + +def get_updateSet(input): + metadata=input["metadata"] + user=metadata["user"] + sets=metadata["admin"]["Set"] + res=[] + for line in sets: + part=line.split("=") + tmp = [] + for p in part: + user_match=re.findall('##(.*?)##', p) + if user_match!=[]: + tmp.append(user[user_match[0]]) + res.append(str(tmp[0])+"="+str(tmp[1])) + result=" SET "+",".join(res) + return result + +def get_filter(data): + if "OR" not in data.keys(): + return "" + op_or=data["OR"] + res = "" + if len(op_or) == 1: + tmp = [] + line = op_or[0]["AND"] + for single_line in line: + val = single_line["value"] + if isinstance(val, str): + val = "\'" + val + "\'" + tmp.append(str(single_line["key"]) + single_line["operator"] + str(val)) + if single_line != line[-1]: + tmp.append("and") + res = " WHERE "+" ".join(tmp) + elif len(op_or) > 1: + tmp = [] + for single_and in op_or: + line = single_and["AND"] + for sigle_line in line: + val = sigle_line["value"] + if isinstance(val, str): + val = "\'" + val + "\'" + tmp.append(str(sigle_line["key"]) + sigle_line["operator"] + str(val)) + if sigle_line != line[-1]: + tmp.append("and") + if single_and != op_or[-1]: + tmp.append("or") + res = " WHERE "+" ".join(tmp) + return res + + +def mysqlQuery(input,logging): + res={"successCode":"1","errorLog":"","results":""} + data=input["metadata"]["admin"] + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + p_table=data["Table"] + p_columnNames=data["columnNames"] + # p_filter=data["Filter"] + column='*' + if len(p_columnNames)==1: + column=p_columnNames[0] + elif len(p_columnNames)>1: + column=','.join(p_columnNames) + where=get_filter(data["Filter"]) + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "SELECT " + column +" From "+ p_table + where + # print(sql) + cursor.execute(sql) + result = cursor.fetchall() + res["results"]=json.dumps(result,ensure_ascii=False) + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + +def mysqlDelete(input,logging): + res={"successCode":"1","errorLog":"","results":""} + data=input["metadata"]["admin"] + p_host=data["Host"] + p_port=int(data["Port"]) + p_db=data["Database"] + p_user=data["User"] + p_password=data["Password"] + p_table=data["Table"] + # where=process_where(data["Filter"]) + where=get_filter(data["Filter"]) + try: + db = pymysql.connect(host=p_host, user=p_user, passwd=p_password, db=p_db, port=p_port, + charset='utf8', cursorclass=pymysql.cursors.DictCursor) + db.ping(reconnect=True) + cursor = db.cursor() + sql = "DELETE From "+ p_table + where + cursor.execute(sql) + db.commit() + cursor.close() + db.close() + return res + except: + res["successCode"] = "0" + res["errorLog"]=traceback.format_exc() + logging.error(traceback.format_exc()) + return res + + +if __name__=="__main__": + input={"metadata":{"admin":{ + "type":"query", + "Table":"student", + "columnNames":["name","age"], + "Set":["##tag1##=##value1##","##tag2##=##value2##"], + "Filter":{ + "OR":[ + { + "AND":[{"key":"age","value":20,"operator":">"},{"key":"weight","value":50,"operator":"<"}] + }, + { + "AND":[{"key":"name","value":"ff","operator":"="}] + } + ] + }, + "Host":"172.26.28.30", + "Port":"3306", + "Database":"test", + "User":"crawl", + "Password":"crawl123" + }}, + "user": { + "tag1": "age", + "tag2": "weight", + "value1": 2, + "value2": 100 + } + } + res=mysqlUpdate(input,"") + print(res) \ No newline at end of file diff --git a/text_analysis/tools/process.py b/text_analysis/tools/process.py new file mode 100644 index 0000000..0f2ff32 --- /dev/null +++ b/text_analysis/tools/process.py @@ -0,0 +1,51 @@ +#coding:utf8 +import os, sys +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from text_analysis.tools import to_kafka +from tools.mysql_helper import mysqlConn,mysqlInsert,mysqlQuery,mysqlExecute,mysqlUpdate,mysqlDelete,getTableColumnNames +import traceback +import time +from log_util.set_logger import set_logger +logging=set_logger('results.log') + +from views import task_queue + +def process_data(): + while True: + try: + # print("task_queue:",task_queue) + if task_queue.qsize() >0: + try: + raw_data = task_queue.get() + res = "" + logging.info("启动数据处理线程——") + logging.info(raw_data) + flag = raw_data["metadata"]["admin"]["type"] + # type分为execute、query、insert、update、delete + if flag == 'insert': + res = mysqlInsert(raw_data, logging) + elif flag == 'execute': + res = mysqlExecute(raw_data, logging) + elif flag == 'update': + res = mysqlUpdate(raw_data, logging) + elif flag == 'query': + res = mysqlQuery(raw_data, logging) + elif flag == 'delete': + res = mysqlDelete(raw_data, logging) + raw_data["result"] = res + logging.info("************写入kafka***********") + to_kafka.send_kafka(raw_data) + except: + raw_data["result"] = {"successCode": "0", "errorLog": "", "results": ""} + raw_data["result"]["errorLog"] = traceback.format_exc() + to_kafka.send_kafka(raw_data) + else: + logging.info("暂无任务,进入休眠--") + print("222222222222222222222222") + time.sleep(10) + except: + logging.error(traceback.format_exc()) \ No newline at end of file diff --git a/text_analysis/tools/seleniumTest.py b/text_analysis/tools/seleniumTest.py new file mode 100644 index 0000000..dbc937a --- /dev/null +++ b/text_analysis/tools/seleniumTest.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +import time +import threading +from selenium import webdriver +import json +from urllib.parse import urljoin +from kakfa_util import KafkaConsume +from kakfa_util import kafkaProduce +from logUtil import get_logger +from Go_fastDfs import uploadFile +import traceback +import queue +import configparser +import os, sys +import re +logger = get_logger("./logs/crawlWebsrcCode.log") +#加载配置文件 +configFile = './config.ini' +# 创建配置文件对象 +con = configparser.ConfigParser() +# 读取文件 +con.read(configFile, encoding='utf-8') +kafkaConfig = dict(con.items('kafka'))#kafka配置信息 +goFastdfsConfig = dict(con.items('goFastdfs'))#goFastdfs配置信息 +class Spider(object): + def __init__(self,url): + self.chromeOptions = self.get_profile() + self.browser = self.get_browser() + self.url = url + def get_profile(self): + chromeOptions = webdriver.ChromeOptions() + chromeOptions.add_argument('--headless') # 谷歌无头模式 + chromeOptions.add_argument('--disable-gpu') # 禁用显卡 + # chromeOptions.add_argument('window-size=1280,800') # 指定浏览器分辨率 + chromeOptions.add_argument("--no-sandbox") + return chromeOptions + + def get_browser(self): + browser = webdriver.Chrome("D:\\工作使用\\zhaoshang\\chromedriver.exe",chrome_options=self.chromeOptions) + return browser + + def _get_page(self,path): + ''' + 获取页面原格式,写入文件并返回路径 + :param path: + :return: + ''' + self.browser.get(self.url) + time.sleep(5) + logger.info("休眠结束") + # 向下偏移了10000个像素,到达底部。 + scrollTop = 10000 + for num in range(1,10): + js = "var q=document.documentElement.scrollTop={}".format(scrollTop*num) + logger.info("第{}次滚动".format(num)) + self.browser.execute_script(js) + time.sleep(5) + # 执行 Chome 开发工具命令,得到mhtml内容 + res = self.browser.execute_cdp_cmd('Page.captureSnapshot', {}) + #获取文章标题 + title = '无标题' + try: + title = self.browser.find_element_by_css_selector("title").get_attribute("textContent") + except Exception as e: + logger.error('获取标题异常----') + traceback.print_exc() + pathName = '{}{}.mhtml'.format(path,title) + with open(pathName, 'w',newline='') as f: + f.write(res['data']) + return pathName,title +if __name__ == '__main__': + #初始化任务队列 + task_queue = queue.Queue() + #跟读kafka线程 + logger.info("开启读取kafka线程---") + t = threading.Thread(target=KafkaConsume, name='LoopThread',args=(kafkaConfig['read_topic'], kafkaConfig['address'], kafkaConfig['group_id'], task_queue,logger)) + t.daemon = True + t.start() + #获取任务执行页面原格式保留 + + + + + + while True: + try: + if task_queue.qsize() >0: + taskStr = task_queue.get() + logger.info('当前任务:{}'.format(taskStr)) + task = json.loads(taskStr) + p1 = u'(https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]' + pattern1 = re.compile(p1) + matcher1 = re.search(p1, task['url']) + if matcher1: + l = Spider(task['url']) + pathName,title = l._get_page(goFastdfsConfig['path']) + l.browser.quit() + #gofast 上传,写入kafka + if '404 Not Found' in title: + logger.error('页面404,无效') + resultData = { + 'code': 500, + 'id': task['id'], + 'message': '页面404' + } + kafkaProduce(kafkaConfig['data_topics'], + json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), + kafkaConfig['address']) + time.sleep(2) + continue + try: + uploadStr = uploadFile('{}upload'.format(goFastdfsConfig['uploadaddress']),pathName,logger) + uploadJson = json.loads(uploadStr) + except Exception as e: + logger.error('文件上传异常----') + traceback.print_exc() + resultData = { + 'code': 500, + 'id': task['id'], + 'message': '文件上传失败' + } + kafkaProduce(kafkaConfig['data_topics'], + json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), + kafkaConfig['address']) + time.sleep(2) + continue + resultData = { + 'code':200, + 'id':task['id'], + 'url':goFastdfsConfig['downloadaddress']+uploadJson['path'], + 'title':title, + 'delMd5':uploadJson['md5'], + 'uploadTime':uploadJson['mtime'], + 'message':'成功' + } + kafkaProduce(kafkaConfig['data_topics'],json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(),kafkaConfig['address']) + logger.info('数据写入成功') + #删除文件 + if (os.path.exists(pathName)): + os.remove(pathName) + logger.info('清除文件:{}'.format(pathName)) + else: + logger.info('要删除的文件不存在:{}'.format(pathName)) + else: + logger.error('非正确url:'.format(task['url'])) + resultData = { + 'code': 500, + 'id': task['id'], + 'message': '非正确url' + } + kafkaProduce(kafkaConfig['data_topics'], + json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), + kafkaConfig['address']) + time.sleep(2) + continue + else: + logger.info("暂无任务,进入休眠--") + time.sleep(10) + except Exception as e: + logger.error('未知异常----') + traceback.print_exc() + resultData = { + 'code': 500, + 'id': task['id'], + 'message': '未知异常' + } + kafkaProduce(kafkaConfig['data_topics'], + json.dumps(resultData).encode('utf-8').decode('unicode_escape').encode(), + kafkaConfig['address']) + time.sleep(2) + diff --git a/text_analysis/tools/to_kafka.py b/text_analysis/tools/to_kafka.py new file mode 100644 index 0000000..a32238a --- /dev/null +++ b/text_analysis/tools/to_kafka.py @@ -0,0 +1,25 @@ +#coding:utf8 +import traceback +import json +from kafka import KafkaProducer +from text_analysis.read_config import load_config +config=load_config() + +def send_kafka(data,logging): + try: + producer = None + topic = config["kafka"]["topic"] + data1=json.dumps(data,ensure_ascii=False) + kafkaProduce(topic,bytes(data1, encoding='utf-8')) + logging.info("数据推入kafka!") + + except Exception as e: + logging.info(traceback.format_exc()) + logging.info('写入kafka失败') + +def kafkaProduce(topic,resultData): + producer = KafkaProducer(bootstrap_servers = '{}'.format(config["kafka"]["bootstrap_servers"]),max_request_size=52428800) + topics = topic.split(',') + for tc in topics: + future = producer.send(tc,resultData) + producer.flush() diff --git a/text_analysis/tools/tool.py b/text_analysis/tools/tool.py new file mode 100644 index 0000000..9b69f81 --- /dev/null +++ b/text_analysis/tools/tool.py @@ -0,0 +1,155 @@ +#coding:utf8 +import re +import json +from jsonpath_ng import jsonpath, parse +import traceback +from log_util.set_logger import set_logger +def parse_data(raw_data,url): + val = None + try: + if "#json#" in url: + parm = url.split("#") + data1 = parse_data(raw_data, parm[0]) + data1_json = json.loads(data1) + expr = parse(parm[2]) + match = [match.value for match in expr.find(data1_json)] + val = match[0] + else: + all_result = raw_data['data'] + param_split = str(url).split(":") + datasourcestr = all_result[param_split[0]] + datasource = json.loads(datasourcestr) + # 创建 JsonPath 表达式对象 + expr = parse(param_split[1]) + # 使用表达式来选择 JSON 元素 + match = [match.value for match in expr.find(datasource)] + val = match[0] + except Exception as e: + traceback.print_exc() + val = '' + return val + + + + +def get_content(inputdata,logging): + """ + 重新组装参数 + :param inputdata:原json数据 + :return: 组装的prompt及其他参数 + """ + res={} + admin=inputdata["metadata"]["admin"] + data=inputdata["data"] + prompt=admin["prompt"] + if_user=re.findall("{{(.*)}}",prompt) + if_data=re.findall("@@(.*)@@",prompt) + if if_user != []: + user_data=inputdata["metadata"]["user"] + if if_user[0] in user_data.keys(): + tmp=user_data[if_user[0]] + prompt=re.sub("{{(.*)}}",tmp,prompt) + if if_data!=[] and if_data[0] in data.keys(): + tmp1=data[if_data[0]] + prompt=re.sub("@@(.*)@@",tmp1,prompt) + res["prompt"]=prompt + res["authorization"]=admin["authorization"] + res["model"]=admin["model"] + res["temperature"]=admin["temperature"] + res["authorization"]=admin["authorization"] + res["top_p"]=admin["top_p"] + res["n"]=admin["n"] + return res + + + +if __name__=="__main__": + datasourcestr = '{"author": "Pelham@Resist_05", "authorId": "1430497892314218502", "authornickname": "", "commentsCount": 2518, "content": "Israeli Army Commander admits they handcuffed 2 couples inside a house then used tanks to destroy the building.15 civilians were burned to death including 8 babies… https://t.co/1V4iUsA3RM", "crawlTimeStr": "2024-01-03 17:39:25", "fansCount": "", "friendsCount": "", "otherInfoJson": "", "pageType": "storyDetailPage", "postCount": "", "postId": "1733008513163784237", "pubTimeStr": "2023-12-08 14:20:01", "subjectId": "304904", "taskId": "1111881", "userType": ""}' + datasource = json.loads(datasourcestr) + # logging.info("数据源:{}".format(datasource)) + # 创建 JsonPath 表达式对象 + expr = parse("$.crawlTimeStr") + # 使用表达式来选择 JSON 元素 + match = [match.value for match in expr.find(datasource)] + val = match[0] + print(val) +# inputdata={ +# "metadata":{ +# "output":{ +# "output_type":"table", +# "label_col":[ +# "软件著作抽取结果" +# ] +# }, +# "input":{ +# "input_type":"text", +# "label":[ +# "7_软件著作过滤器" +# ] +# }, +# "address":"http://172.18.1.181:9011/chatGpt/", +# "admin":{ +# "authorization":"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", +# "top_p":"1", +# "user_input":[ +# { +# "keyname":"tag", +# "keydesc":"" +# } +# ], +# "temperature":"0.2", +# "model":"gpt-3.5-turbo-16k", +# "prompt":"请在下面这句话中提取出:证书号、软件名称、著作权人,以json格式输出,找不到的字段赋值为空字符串,不要有多余的文字输出,只输出json结构。@@7_软件著作过滤器@@", +# "n":"1" +# }, +# "index":1 +# }, +# "data":{ +# "1_项目文件上传":"[{ \"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/05/1/1-基于时间序列遥感 影像洪涝检测系统.jpg\",\"fileType\":\"jpg\", \"filePath\":\"/软件著作/1-基于时间序列遥感 影像洪涝检测系统.jpg\",\"fileId\":\"cd6592f0389bb1da25afbb44901f9cde\",\"fileName\":\"1-基于时间序列遥感 影像洪涝检测系统.jpg\" },{ \"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/06/1/2-基于遥感影像的快速变化检测系统.jpg\",\"fileType\":\"jpg\", \"filePath\":\"/软件著作/2-基于遥感影像的快速变化检测系统.jpg\",\"fileId\":\"338847e34904fa96e8834cb220667db8\",\"fileName\":\"2-基于遥感影像的快速变化检测系统.jpg\" },{ \"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/08/1/3-基于时空模型的遥感时间序列森林火灾检测系统.jpg\",\"fileType\":\"jpg\", \"filePath\":\"/软件著作/1/3-基于时空模型的遥感时间序列森林火灾检测系统.jpg\",\"fileId\":\"944eec1cf98f216ea953459dac4dd505\",\"fileName\":\"3-基于时空模型的遥感时间序列森林火灾检测系统.jpg\" },{ \"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/09/1/4-基于隐马尔可夫模型的遥感时间序列分类系统.jpg\",\"fileType\":\"jpg\", \"filePath\":\"/软件著作/4-基于隐马尔可夫模型的遥感时间序列分类系统.jpg\",\"fileId\":\"eb378cb9ee914323f601500378dfad76\",\"fileName\":\"4-基于隐马尔可夫模型的遥感时间序列分类系统.jpg\" }]", +# "2_文件分类信息":"{\"软件著作\":4}", +# "3_OCR识别内容":"{\"content\":\" 22222222222222222222222222222222222222222222222222\\n中华人民共和国国家版权局\\n计算机软件著作权登记证书\\n证书号:软著登字第1623261号\\n软件名称:\\n基于遥感影像的快速变化检测系统\\nV1.0\\n著作权人:中国科学院遥感与数字地球研究所\\n开发完成日期:2016年08月01日\\n首次发表日期:未发表\\n权利取得方式:原始取得\\n权利范围:全部权利\\n登记号:2017SR037977\\n根据《计算机软件保护条例》和《计算机软件著作权登记办法》的\\n规定,经中国版权保护中心审核,对以上事项予以登记\\n计算机软件著作权\\n登记专用章\\n2017年02月10日\\nNo.01433672\",\"fileId\":\"338847e34904fa96e8834cb220667db8\",\"fileName\":\"2-基于遥感影像的快速变化检测系统.jpg\",\"filePath\":\"/软件著作/2-基于遥感影像的快速变化检测系统.jpg\",\"fileType\":\"jpg\",\"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/06/1/2-基于遥感影像的快速变化检测系统.jpg\",\"pageNum\":1}", +# "businessKey":"185aef3b1c810799a6be8314abf6512c", +# "7_软件著作过滤器":"{\"content\":\" 22222222222222222222222222222222222222222222222222\\n中华人民共和国国家版权局\\n计算机软件著作权登记证书\\n证书号:软著登字第1623261号\\n软件名称:\\n基于遥感影像的快速变化检测系统\\nV1.0\\n著作权人:中国科学院遥感与数字地球研究所\\n开发完成日期:2016年08月01日\\n首次发表日期:未发表\\n权利取得方式:原始取得\\n权利范围:全部权利\\n登记号:2017SR037977\\n根据《计算机软件保护条例》和《计算机软件著作权登记办法》的\\n规定,经中国版权保护中心审核,对以上事项予以登记\\n计算机软件著作权\\n登记专用章\\n2017年02月10日\\nNo.01433672\",\"fileId\":\"338847e34904fa96e8834cb220667db8\",\"fileName\":\"2-基于遥感影像的快速变化检测系统.jpg\",\"filePath\":\"/软件著作/2-基于遥感影像的快速变化检测系统.jpg\",\"fileType\":\"jpg\",\"fileUrl\":\"http://172.18.1.130:9985/group33/default/20230816/16/06/1/2-基于遥感影像的快速变化检测系统.jpg\",\"pageNum\":1}" +# }, +# "created":1691004265000, +# "module":"OCR", +# "start_tag":"false", +# "last_edit":1692464331000, +# "next_app_id":[ +# { +# "start_id":86, +# "edge_id":49, +# "end_id":90 +# } +# ], +# "transfer_id":11, +# "blueprint_id":3, +# "scenes_id":3, +# "scenario":{ +# "dataloss":1, +# "autoCommitTriggerLast":1, +# "maxErrors":3, +# "autoCommit":1, +# "freshVariables":1 +# }, +# "wait_condition":[ +# +# ], +# "scheduling":{ +# "interval":-1, +# "type":"single" +# }, +# "name":"软件著作抽取", +# "businessKey":"185aef3b1c810799a6be8314abf6512c", +# "id":86, +# "describe":"软件著作抽取" +# } +# a=get_content(inputdata,"") +# print(a) + + + + + + + diff --git a/text_analysis/urls.py b/text_analysis/urls.py new file mode 100644 index 0000000..89800b2 --- /dev/null +++ b/text_analysis/urls.py @@ -0,0 +1,13 @@ +from django.conf.urls import include, url +from django.contrib import admin +from text_analysis import views + +urlpatterns = [ + + url(r'^robotIdentification',views.robotIdentification, name='robotIdentification'), + # url(r'^mysqlConnection',views.mysqlConnection, name='mysqlConnection'), + # url(r'^mysqlField', views.mysqlField, name='mysqlField') + +] + + diff --git a/text_analysis/views.py b/text_analysis/views.py new file mode 100644 index 0000000..2ffb064 --- /dev/null +++ b/text_analysis/views.py @@ -0,0 +1,383 @@ +# coding:utf8 +import os, sys +import io + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from django.http import HttpResponse +from text_analysis.tools import to_kafka +from django.views.decorators.csrf import csrf_exempt +from log_util.set_logger import set_logger +from text_analysis.tools.tool import parse_data +logging = set_logger('logs/results.log') +import traceback +import time +import joblib +from text_analysis.cusException import userFile_Exception, postFile_Exception + +from kazoo.client import KazooClient +from kazoo.protocol.states import EventType +# 任务队列 +import queue +task_queue = queue.PriorityQueue() +stop_dict={} +from text_analysis.read_config import load_config +config=load_config() + +#mysql连接池 +from text_analysis.tools.db_pool import get_conn_pool + +@csrf_exempt +def robotIdentification(request): + if request.method == 'POST': + try: + raw_data = json.loads(request.body) + if "trace" in raw_data.keys() and raw_data["trace"]==True: + task_queue.put((-1, time.time(),raw_data)) + else: + task_queue.put((1,time.time(), raw_data)) + return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) + except: + logging.error(traceback.format_exc()) + return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) + else: + return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) + + +def predict(user_file_result,post_file_result,task): + try: + # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + # "fansCount": 308, "likeCount": 92707, "postCount": 14237, + # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + # "authentication": 0}, + # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} + # 用户数据 + res = {"successCode": "1", "errorLog": "", "results": {}} + user_data = [] + # 返回值需要的三个字段 + accountId = "" + nickName = "" + accountName = "" + data = {} + if user_file_result: + data['user_file'] = user_file_result + logging.info('用户数据:{}'.format(data['user_file'])) + accountId = data["user_file"]["accountId"] + nickName = data["user_file"]["nickName"] + accountName = data["user_file"]["accountName"] + else: + data['user_file'] = {} + raise userFile_Exception + if post_file_result: + data['post_file'] = post_file_result + logging.info('帖子数据:{}'.format(data['post_file'])) + else: + data['post_file'] = {} + raise postFile_Exception + # 识别结果返回值 + recognition_code = "0" + try: + user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1 = 0 + try: + user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2 = 0 + try: + user_data_fansCount_3 = int(data["user_file"]["fansCount"]) + except: + user_data_fansCount_3 = 0 + try: + user_data_likeCount_4 = int(data["user_file"]["likeCount"]) + except: + user_data_likeCount_4 = 0 + try: + user_data_postCount_5 = int(data["user_file"]["postCount"]) + except: + user_data_postCount_5 = 0 + try: + user_data_authentication_6 = int(data["user_file"]["authentication"]) + except: + user_data_authentication_6 = 0 + user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) + # 帖子数据 + if data["post_file"] == {}: + recognition_code = "-1" + else: + post_data = [] + try: + post_data_count_1 = int(data["post_file"]["count"]) + except: + post_data_count_1 = 0 + try: + post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2 = 0 + try: + post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3 = 0 + try: + post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4 = 0 + try: + post_data_length_5 = int(data["post_file"]["length"]) + except: + post_data_length_5 = 0 + try: + post_data_tags_6 = int(data["post_file"]["tags"]) + except: + post_data_tags_6 = 0 + try: + post_data_https_7 = int(data["post_file"]["https"]) + except: + post_data_https_7 = 0 + try: + post_data_at_8 = int(data["post_file"]["at"]) + except: + post_data_at_8 = 0 + try: + post_data_diffdate_9 = int(data["post_file"]["diffdate"]) + except: + post_data_diffdate_9 = 0 + post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) + features = [user_data + post_data] + bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 + result = bot_user.predict(features) + recognition_code = str(result[0]) + # logging.info("预测模型结果为{}".format(result)) + results = {} + # 用户id + results['authorId'] = accountId + # 用户昵称 + results['nickName'] = nickName + # 用户账号 + results['accountName'] = accountName + #结束标识 + res['isLast'] = True + #数据类型 --目前只提供给图谱使用 + results['pageType'] = 'userAuthenPage' + if recognition_code == '0': + results['recognitionResult'] = '非机器人' + results['recognitionCode'] = recognition_code + elif recognition_code == '1': + results['recognitionResult'] = '机器人' + results['recognitionCode'] = recognition_code + else: + results['recognitionResult'] = '未知识别结果' + results['recognitionCode'] = recognition_code + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=1 + res["message"]="成功" + task["result"] = res + logging.info("增加预测数据-{}".format(task)) + to_kafka.send_kafka(task, logging) + except userFile_Exception: + res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} + results = {} + results['authorId'] = "" + results['nickName'] = "" + results['accountName'] = "" + results['recognitionResult'] = '用户数据为空' + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="用户数据为空" + task["result"] = res + logging.info("该条请求用户数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except postFile_Exception: + res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = '帖子数据为空' + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="帖子数据为空" + task["result"] = res + logging.info("该条请求帖子数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except: + res = {"successCode": "0", "errorLog": "", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = "" + results["isLast"]=1 + res["results"] = json.dumps(results) + res["status"]=2 + res["message"]="异常" + task["result"] = res + task["result"]["error"] = traceback.format_exc() + logging.info(traceback.format_exc()) + to_kafka.send_kafka(task, logging) + +def data_structure(dbConfig): + ''' + 水军识别数据构造 + 主要是写入用户表、主贴表 + ''' + #获取数据库连接 + sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) + #用户任务作为响应体 + send_task = None + while True: + try: + if task_queue.qsize()>0: + p,t,task = task_queue.get(timeout=1) + logging.info("当前任务队列长度{}".format(task_queue.qsize()+1)) + task_id=task["scenes_id"] + task_version=task["version"] + logging.info("当前version信息为:{}".format(stop_dict)) + if task_id in stop_dict.keys() and task_version!=stop_dict[task_id]["version"]: + logging.info("已暂停任务,数据过滤掉") + continue + + input = task['input'] + account = input['account'] + post = input['post'] + #判断数据类型 + data = task['data'] + page_type = None + taskId = None + for data_str in data: + try: + app_data = json.loads(data[data_str]) + taskId = app_data['taskId'] + if "pageType" in app_data: + page_type = app_data['pageType'] + break + except: + logging.error("正常判断,异常请忽略") + logging.info("数据类型:{}".format(page_type)) + if page_type == 'userInfoPage': + # 用户任务作为响应体 + send_task = task + #用户类型数据写入 + sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, account['taskId']), + parse_data(task, account['accountId']), + parse_data(task, account['accountName']), + parse_data(task, account['nickName']), + parse_data(task, account['fansCount']), + parse_data(task, account['likeCount']), + parse_data(task, account['postCount']), + parse_data(task, account['otherInfo']), + parse_data(task, account['authentication']) + ) + sqlhelper.insert(sql,values) + + elif page_type == 'storyDetailPage': + #帖子类型数据写入 + # 定义 SQL 语句 + sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, post['taskId']), + parse_data(task, post['postId']), + parse_data(task, post['accountId']), + parse_data(task, post['accountName']), + parse_data(task, post['likeCount']), + parse_data(task, post['emotionCount']), + parse_data(task, post['commentsCount']), + parse_data(task, post['shareCount']), + parse_data(task, post['content']), + parse_data(task, post['pubTime']), + parse_data(task, post['crawlTime']) + ) + sqlhelper.insert(sql,values) + #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 + if 'isLast'in data and data['isLast']: + #获取用户相关的数据 + sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) + user_file_result = sqlhelper.queryOne(sql) + # 获取帖子相关的数据 + sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId) + post_file_result = sqlhelper.queryOne(sql) + if send_task == None: + send_task = task + predict(user_file_result,post_file_result,send_task) + #结束置空 + send_task = None + else: + # 暂无任务,进入休眠 + time.sleep(10) + except Exception as e: + traceback.print_exc() + + +def zk_monitoring(): + try: + #线上环境 + zk = KazooClient(hosts=config['zookeeper']['zkhost']) + #测试环境 + # zk = KazooClient(hosts='172.16.12.55:2181,172.16.12.56:2181,172.16.12.57:2181') + zk.start() + # 设置监听器 + @zk.DataWatch("/analyze") + def watch_node(data, stat, event): + if event is not None and event.type == EventType.CHANGED: + data, stat = zk.get("/analyze") + logging.info("执行删除操作:{}".format(data)) + d = json.loads(data) + id = d["scenes_id"] + stop_dict[id] = {} + stop_dict[id]["version"] = d["version"] + stop_dict[id]["operation"] = d["operation"] + # 保持程序运行以监听节点变化 + try: + while True: + time.sleep(1) + except: + logging.info("Stopping...") + # 关闭连接 + zk.stop() + zk.close() + except: + logging.error(traceback.format_exc()) + + +if __name__ == '__main__': + all_result = { + "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}", + "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} + data = {} + # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + resultList = user_file_result['resultList'] + data['user_file'] = resultList[0] + else: + data['user_file'] = {} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + else: + data['post_file'] = {} + + print(data) + + + + + + + diff --git a/text_analysis/views.py_20240920 b/text_analysis/views.py_20240920 new file mode 100644 index 0000000..53141af --- /dev/null +++ b/text_analysis/views.py_20240920 @@ -0,0 +1,340 @@ +# coding:utf8 +import os, sys +import io + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') +cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() +par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) +sys.path.append(cur_dir) +sys.path.append(par_dir) +import json +from django.http import HttpResponse +from text_analysis.tools import to_kafka +from django.views.decorators.csrf import csrf_exempt +from log_util.set_logger import set_logger +from text_analysis.tools.tool import parse_data +logging = set_logger('logs/results.log') +import traceback +import queue +import requests +import time +from datetime import datetime +import os +import joblib +from text_analysis.cusException import userFile_Exception, postFile_Exception + +# 任务队列 +global task_queue +task_queue = queue.Queue() + +#mysql连接池 +from text_analysis.tools.db_pool import get_conn_pool + +@csrf_exempt +def robotIdentification(request): + if request.method == 'POST': + try: + raw_data = json.loads(request.body) + task_queue.put(raw_data) + return HttpResponse(json.dumps({"code": 1, "msg": "请求正常!"}, ensure_ascii=False)) + except: + logging.error(traceback.format_exc()) + return HttpResponse(json.dumps({"code": 0, "msg": "请求json格式不正确!"}, ensure_ascii=False)) + else: + return HttpResponse(json.dumps({"code": 0, "msg": "请求方式错误,改为post请求"}, ensure_ascii=False)) + + +def predict(user_file_result,post_file_result,task): + try: + # raw_data = {"user_file": {"accountId": "39234393", "accountName": "hello", "nickName": "Johnson Leung", + # "fansCount": 308, "likeCount": 92707, "postCount": 14237, + # "otherInfo": "{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + # "authentication": 0}, + # "post_file": {"count": 1, "LikeCount": 12, "CommentsCount": 1, "ShareCount": 1, + # "length": 150, "tags": 0, "https": 0, "at": 0, "diffdate": 1}} + # 用户数据 + res = {"successCode": "1", "errorLog": "", "results": {}} + user_data = [] + # 返回值需要的三个字段 + accountId = "" + nickName = "" + accountName = "" + data = {} + if user_file_result: + data['user_file'] = user_file_result + logging.info('用户数据:{}'.format(data['user_file'])) + accountId = data["user_file"]["accountId"] + nickName = data["user_file"]["nickName"] + accountName = data["user_file"]["accountName"] + else: + data['user_file'] = {} + raise userFile_Exception + if post_file_result: + data['post_file'] = post_file_result + logging.info('帖子数据:{}'.format(data['post_file'])) + else: + data['post_file'] = {} + raise postFile_Exception + # 识别结果返回值 + recognition_code = "0" + try: + user_data_otherInfo_1 = 0 if data["user_file"]["otherInfo"].strip() == "" else 1 + except: + user_data_otherInfo_1 = 0 + try: + user_data_nickName_2 = 0 if data["user_file"]["nickName"].strip() == "" else 1 + except: + user_data_nickName_2 = 0 + try: + user_data_fansCount_3 = int(data["user_file"]["fansCount"]) + except: + user_data_fansCount_3 = 0 + try: + user_data_likeCount_4 = int(data["user_file"]["likeCount"]) + except: + user_data_likeCount_4 = 0 + try: + user_data_postCount_5 = int(data["user_file"]["postCount"]) + except: + user_data_postCount_5 = 0 + try: + user_data_authentication_6 = int(data["user_file"]["authentication"]) + except: + user_data_authentication_6 = 0 + user_data.extend( + [user_data_otherInfo_1, user_data_nickName_2, user_data_fansCount_3, user_data_likeCount_4, + user_data_postCount_5, user_data_authentication_6]) + # 帖子数据 + if data["post_file"] == {}: + recognition_code = "-1" + else: + post_data = [] + try: + post_data_count_1 = int(data["post_file"]["count"]) + except: + post_data_count_1 = 0 + try: + post_data_LikeCount_2 = int(data["post_file"]["LikeCount"]) + except: + post_data_LikeCount_2 = 0 + try: + post_data_CommentsCount_3 = int(data["post_file"]["CommentsCount"]) + except: + post_data_CommentsCount_3 = 0 + try: + post_data_ShareCount_4 = int(data["post_file"]["ShareCount"]) + except: + post_data_ShareCount_4 = 0 + try: + post_data_length_5 = int(data["post_file"]["length"]) + except: + post_data_length_5 = 0 + try: + post_data_tags_6 = int(data["post_file"]["tags"]) + except: + post_data_tags_6 = 0 + try: + post_data_https_7 = int(data["post_file"]["https"]) + except: + post_data_https_7 = 0 + try: + post_data_at_8 = int(data["post_file"]["at"]) + except: + post_data_at_8 = 0 + try: + post_data_diffdate_9 = int(data["post_file"]["diffdate"]) + except: + post_data_diffdate_9 = 0 + post_data.extend( + [post_data_count_1, post_data_LikeCount_2, post_data_CommentsCount_3, post_data_ShareCount_4, + post_data_length_5, post_data_tags_6, post_data_https_7, post_data_at_8, post_data_diffdate_9]) + features = [user_data + post_data] + bot_user = joblib.load(cur_dir + "/model/bot_user.pkl") # 加载训练好的模型 + result = bot_user.predict(features) + recognition_code = str(result[0]) + # logging.info("预测模型结果为{}".format(result)) + results = {} + # 用户id + results['authorId'] = accountId + # 用户昵称 + results['nickName'] = nickName + # 用户账号 + results['accountName'] = accountName + #结束标识 + res['isLast'] = True + #数据类型 --目前只提供给图谱使用 + results['pageType'] = 'userAuthenPage' + if recognition_code == '0': + results['recognitionResult'] = '非机器人' + results['recognitionCode'] = recognition_code + elif recognition_code == '1': + results['recognitionResult'] = '机器人' + results['recognitionCode'] = recognition_code + else: + results['recognitionResult'] = '未知识别结果' + results['recognitionCode'] = recognition_code + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=1 + res["message"]="成功" + task["result"] = res + logging.info("增加预测数据-{}".format(task)) + to_kafka.send_kafka(task, logging) + except userFile_Exception: + res = {"successCode": "0", "errorLog": "用户数据为空!", "results": {}} + results = {} + results['authorId'] = "" + results['nickName'] = "" + results['accountName'] = "" + results['recognitionResult'] = '用户数据为空' + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="用户数据为空" + task["result"] = res + logging.info("该条请求用户数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except postFile_Exception: + res = {"successCode": "0", "errorLog": "帖子数据为空!", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = '帖子数据为空' + results["isLast"]=1 + res['results'] = json.dumps(results) + res["status"]=2 + res["message"]="帖子数据为空" + task["result"] = res + logging.info("该条请求帖子数据为空-{}".format(task)) + to_kafka.send_kafka(task, logging) + except: + res = {"successCode": "0", "errorLog": "", "results": {}} + results = {} + results['authorId'] = accountId + results['nickName'] = nickName + results['accountName'] = accountName + results['recognitionResult'] = "" + results["isLast"]=1 + res["results"] = json.dumps(results) + res["status"]=2 + res["message"]="异常" + task["result"] = res + task["result"]["error"] = traceback.format_exc() + logging.info(traceback.format_exc()) + to_kafka.send_kafka(task, logging) + +def data_structure(dbConfig): + ''' + 水军识别数据构造 + 主要是写入用户表、主贴表 + ''' + #获取数据库连接 + sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) + #用户任务作为响应体 + send_task = None + while True: + if task_queue.qsize() > 0: + try: + logging.info("队列长度:{}".format(task_queue.qsize())) + task = task_queue.get() + input = task['input'] + account = input['account'] + post = input['post'] + #判断数据类型 + data = task['data'] + page_type = None + taskId = None + app_data = None + for data_str in data: + try: + app_data = json.loads(data[data_str]) + taskId = app_data['taskId'] + if "pageType" in app_data: + page_type = app_data['pageType'] + break + except: + logging.error("正常判断,异常请忽略") + logging.info("数据类型:{}".format(page_type)) + if page_type == 'userInfoPage': + # 用户任务作为响应体 + send_task = task + #用户类型数据写入 + sql = "INSERT INTO `user_account`(`taskId`, `accountId`, `accountName`, `nickName`, `fansCount`, `likeCount`, `postCount`, `otherInfo`, `authentication`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, account['taskId']), + parse_data(task, account['accountId']), + parse_data(task, account['accountName']), + parse_data(task, account['nickName']), + parse_data(task, account['fansCount']), + parse_data(task, account['likeCount']), + parse_data(task, account['postCount']), + parse_data(task, account['otherInfo']), + parse_data(task, account['authentication']) + ) + sqlhelper.insert(sql,values) + + elif page_type == 'storyDetailPage': + #帖子类型数据写入 + # 定义 SQL 语句 + sql = "INSERT INTO `user_post`(`taskId`, `postId`, `accountId`, `accountName`, `likeCount`, `emotionCount`, `commentsCount`, `shareCount`, `content`, `pubTime`, `crawlTime`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + # 构造参数元组 + values = ( + parse_data(task, post['taskId']), + parse_data(task, post['postId']), + parse_data(task, post['accountId']), + parse_data(task, post['accountName']), + parse_data(task, post['likeCount']), + parse_data(task, post['emotionCount']), + parse_data(task, post['commentsCount']), + parse_data(task, post['shareCount']), + parse_data(task, post['content']), + parse_data(task, post['pubTime']), + parse_data(task, post['crawlTime']) + ) + sqlhelper.insert(sql,values) + #判断是否是此次数据流的最后一条,最后一条直接触发用户的水军识别算法 + if 'isLast'in app_data: + #获取用户相关的数据 + sql = "select accountId,accountName,nickName,fansCount,likeCount,postCount,otherInfo,authentication from user_account where taskId ='{}'".format(taskId) + user_file_result = sqlhelper.queryOne(sql) + # 获取帖子相关的数据 + sql = "SELECT CONVERT(COUNT(postId), CHAR(255)) AS count, CONVERT(AVG(likeCount), CHAR(255)) AS LikeCount, CONVERT(AVG(commentsCount), CHAR(255)) AS CommentsCount, CONVERT(AVG(shareCount), CHAR(255)) AS ShareCount, CONVERT(AVG(LENGTH(content)), CHAR(255)) AS length, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '#', ''))) / LENGTH('#')), CHAR(255)) AS tags, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, 'https', ''))) / LENGTH('https')), CHAR(255)) AS https, CONVERT(AVG((LENGTH(content) - LENGTH(REPLACE(content, '@', ''))) / LENGTH('@')), CHAR(255)) AS at, CONVERT(MIN(TIMESTAMPDIFF(SECOND, pubTime, GREATEST(pubTime, crawlTime))), CHAR(255)) AS diffdate FROM user_post where taskId ='{}'".format(taskId) + post_file_result = sqlhelper.queryOne(sql) + if send_task == None: + send_task = task + predict(user_file_result,post_file_result,send_task) + #结束置空 + send_task = None + except Exception as e: + traceback.print_exc() + else: + # 暂无任务,进入休眠 + time.sleep(10) +if __name__ == '__main__': + all_result = { + "9_获取用户发帖信息": "{\"resultList\": [{\"count\": \"10\", \"LikeCount\": \"1\", \"CommentsCount\": \"0.1\", \"ShareCount\": \"0.4\", \"length\": \"241.8000\", \"tags\": \"5.80000000\", \"https\": \"1.20000000\", \"at\": \"0.40000000\", \"diffdate\": \"170269\"}]}", + "8_获取用户信息": "{\"resultList\": [{\"accountId\": \"1368232444323799043\", \"accountName\": \"Ujjal best Tech@UjjalKumarGho19\", \"nickName\": \"UjjalKumarGho19\", \"fansCount\": \"660\", \"likeCount\": \"2096\", \"postCount\": \"579\", \"otherInfo\": \"\", \"authentication\": 1}]}"} + data = {} + # {"user_file": "9_获取用户信息", "post_file": "10_获取用户发帖信息"} + user_file_result = json.loads(all_result[data['user_file']]) + post_file_result = json.loads(all_result[data['post_file']]) + if user_file_result['resultList']: + resultList = user_file_result['resultList'] + data['user_file'] = resultList[0] + else: + data['user_file'] = {} + if post_file_result['resultList']: + data['post_file'] = post_file_result['resultList'][0] + else: + data['post_file'] = {} + + print(data) + + + + + + + diff --git a/text_analysis/wsgi.py b/text_analysis/wsgi.py new file mode 100644 index 0000000..36735ac --- /dev/null +++ b/text_analysis/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for Zhijian_Project_WebService project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/1.8/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "text_analysis.settings") + +application = get_wsgi_application() diff --git a/txt/postData-user.txt b/txt/postData-user.txt new file mode 100644 index 0000000..e040ac4 --- /dev/null +++ b/txt/postData-user.txt @@ -0,0 +1,85 @@ +{ + "metadata":{ + "address":"http://172.24.12.127:9020/robotIdentification/", + "index":0, + "admin":{ + "user_file":{ + "accountId":"39234393", + "accountName":"hello", + "nickName":"Johnson Leung", + "fansCount":308, + "likeCount":92707, + "postCount":14237, + "otherInfo":"{\"\"otherInfo\"\":\"\"{\"\"bio\"\": \"\"Huge}", + "authentication":0 + }, + "post_file":{ + "count":1, + "LikeCount":12, + "CommentsCount":1, + "ShareCount":1, + "length":150, + "tags":0, + "https":0, + "at":0, + "diffdate":1 + } + } + }, + "output":{ + "output_type":"table", + "label_col":[ + + ] + }, + "input":{ + "input_type":"text", + "label":[ + "2_任务提取" + ] + }, + "user":{ + "tag":"" + }, + "data":{ + + }, + "created":1691004265000, + "module":"robotIdentification", + "start_tag":false, + "multi_branch":0, + "last_edit":1693417201000, + "next_app_id":[ + { + "start_id":154, + "edge_id":75, + "end_id":155 + } + ], + "transfer_id":3, + "version":1, + "blueprint_id":4, + "scenes_id":5, + "scenario":{ + "dataloss":1, + "autoCommitTriggerLast":1, + "maxErrors":3, + "autoCommit":1, + "freshVariables":1 + }, + "wait_condition":[ + + ], + "scheduling":{ + "interval":-1, + "type":"single" + }, + "name":"robotIdentification", + "businessKey":"19615b029da477fb", + "id":154, + "position":[ + 100, + 200 + ], + "describe":"" +} \ No newline at end of file diff --git a/uwsgi.ini b/uwsgi.ini new file mode 100644 index 0000000..22e8b76 --- /dev/null +++ b/uwsgi.ini @@ -0,0 +1,8 @@ +[uwsgi] +http = 0.0.0.0:9020 +chdir = ../robotIdentification +wsgi-file = ../robotIdentificationi/wsgi.py +processes = 1 +threads = 2 +listen = 1024 +http-timeout=21600 diff --git a/wsgi.log b/wsgi.log new file mode 100644 index 0000000..93c076c --- /dev/null +++ b/wsgi.log @@ -0,0 +1,86 @@ +*** Starting uWSGI 2.0.21 (64bit) on [Fri Jan 3 10:55:42 2025] *** +compiled with version: 11.2.0 on 24 October 2023 19:53:56 +os: Linux-3.10.0-1127.19.1.el7.x86_64 #1 SMP Tue Aug 25 17:23:54 UTC 2020 +nodename: node-04 +machine: x86_64 +clock source: unix +pcre jit disabled +detected number of CPU cores: 64 +current working directory: /opt/analyze/apps/robotIdentification +detected binary path: /opt/analyze/environment/python3.8/bin/uwsgi +uWSGI running as root, you can use --uid/--gid/--chroot options +*** WARNING: you are running uWSGI as root !!! (use the --uid flag) *** +chdir() to ../robotIdentification +*** WARNING: you are running uWSGI without its master process manager *** +your processes number limit is 1031041 +your memory page size is 4096 bytes +detected max file descriptor number: 65535 +lock engine: pthread robust mutexes +thunder lock: disabled (you can enable it with --thunder-lock) +uWSGI http bound on 0.0.0.0:9020 fd 4 +spawned uWSGI http 1 (pid: 59163) +uwsgi socket 0 bound to TCP address 127.0.0.1:33452 (port auto-assigned) fd 3 +uWSGI running as root, you can use --uid/--gid/--chroot options +*** WARNING: you are running uWSGI as root !!! (use the --uid flag) *** +Python version: 3.8.16 (default, Jun 12 2023, 18:09:05) [GCC 11.2.0] +Python main interpreter initialized at 0x2277240 +uWSGI running as root, you can use --uid/--gid/--chroot options +*** WARNING: you are running uWSGI as root !!! (use the --uid flag) *** +python threads support enabled +your server socket listen backlog is limited to 1024 connections +your mercy for graceful operations on workers is 60 seconds +mapped 83376 bytes (81 KB) for 2 cores +*** Operational MODE: threaded *** +Exception in thread dataStructureThread: +Traceback (most recent call last): + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 325, in connection + con = self._idle_cache.pop(0) +IndexError: pop from empty list + +During handling of the above exception, another exception occurred: + +Traceback (most recent call last): + File "/opt/analyze/environment/python3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner + self.run() + File "/opt/analyze/environment/python3.8/lib/python3.8/threading.py", line 870, in run + self._target(*self._args, **self._kwargs) + File "/opt/analyze/apps/robotIdentification/./text_analysis/views.py", line 237, in data_structure + sqlhelper = get_conn_pool(dbConfig['host'],dbConfig['port'],dbConfig['username'],dbConfig['password'],dbConfig['db']) + File "/opt/analyze/apps/robotIdentification/./text_analysis/tools/db_pool.py", line 105, in get_conn_pool + sqlhelper = MySQLUtils(host, port, username, password, db) + File "/opt/analyze/apps/robotIdentification/./text_analysis/tools/db_pool.py", line 11, in __init__ + self.pool = PooledDB( + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 267, in __init__ + idle = [self.dedicated_connection() for i in range(mincached)] + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 267, in + idle = [self.dedicated_connection() for i in range(mincached)] + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 338, in dedicated_connection + return self.connection(False) + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 327, in connection + con = self.steady_connection() + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/PooledDB.py", line 273, in steady_connection + return connect( + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/SteadyDB.py", line 137, in connect + return SteadyDBConnection( + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/SteadyDB.py", line 192, in __init__ + self._store(self._create()) + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/DBUtils/SteadyDB.py", line 211, in _create + con = self._creator(*self._args, **self._kwargs) + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/connections.py", line 358, in __init__ + self.connect() + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/connections.py", line 664, in connect + self._request_authentication() + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/connections.py", line 954, in _request_authentication + auth_packet = self._read_packet() + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/connections.py", line 772, in _read_packet + packet.raise_for_error() + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/protocol.py", line 221, in raise_for_error + err.raise_mysql_exception(self._data) + File "/opt/analyze/environment/python3.8/lib/python3.8/site-packages/pymysql/err.py", line 143, in raise_mysql_exception + raise errorclass(errno, errval) +pymysql.err.OperationalError: (1049, "Unknown database 'analyze'") +WSGI app 0 (mountpoint='') ready in 1 seconds on interpreter 0x2277240 pid: 59162 (default app) +uWSGI running as root, you can use --uid/--gid/--chroot options +*** WARNING: you are running uWSGI as root !!! (use the --uid flag) *** +*** uWSGI is running in multiple interpreter mode *** +spawned uWSGI worker 1 (and the only) (pid: 59162, cores: 2) diff --git a/wsgi.py b/wsgi.py new file mode 100644 index 0000000..ece382c --- /dev/null +++ b/wsgi.py @@ -0,0 +1,36 @@ +""" +WSGI config for Zhijian_Project_WebService project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/1.8/howto/deployment/wsgi/ +""" + +import os +import configparser +import threading +from text_analysis.views import predict +from text_analysis.views import data_structure + +#加载配置文件 +configFile = './config.ini' +# 创建配置文件对象 +con = configparser.ConfigParser() +# 读取文件 +con.read(configFile, encoding='utf-8') +dbConfig = dict(con.items('database'))#数据库配置信息 + +#开启数据入库线程 +t = threading.Thread(target=data_structure, name='dataStructureThread',args=(dbConfig,)) +t.daemon = True +t.start() + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "text_analysis.settings") +application = get_wsgi_application() + + + +