用户水军识别应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

51 lines
2.1 KiB

#coding:utf8
import os, sys
cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd()
par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir))
sys.path.append(cur_dir)
sys.path.append(par_dir)
import json
from text_analysis.tools import to_kafka
from tools.mysql_helper import mysqlConn,mysqlInsert,mysqlQuery,mysqlExecute,mysqlUpdate,mysqlDelete,getTableColumnNames
import traceback
import time
from log_util.set_logger import set_logger
logging=set_logger('results.log')
from views import task_queue
def process_data():
while True:
try:
# print("task_queue:",task_queue)
if task_queue.qsize() >0:
try:
raw_data = task_queue.get()
res = ""
logging.info("启动数据处理线程——")
logging.info(raw_data)
flag = raw_data["metadata"]["admin"]["type"]
# type分为execute、query、insert、update、delete
if flag == 'insert':
res = mysqlInsert(raw_data, logging)
elif flag == 'execute':
res = mysqlExecute(raw_data, logging)
elif flag == 'update':
res = mysqlUpdate(raw_data, logging)
elif flag == 'query':
res = mysqlQuery(raw_data, logging)
elif flag == 'delete':
res = mysqlDelete(raw_data, logging)
raw_data["result"] = res
logging.info("************写入kafka***********")
to_kafka.send_kafka(raw_data)
except:
raw_data["result"] = {"successCode": "0", "errorLog": "", "results": ""}
raw_data["result"]["errorLog"] = traceback.format_exc()
to_kafka.send_kafka(raw_data)
else:
logging.info("暂无任务,进入休眠--")
print("222222222222222222222222")
time.sleep(10)
except:
logging.error(traceback.format_exc())