#coding:utf8 import os, sys cur_dir = os.path.dirname(os.path.abspath(__file__)) or os.getcwd() par_dir = os.path.abspath(os.path.join(cur_dir, os.path.pardir)) sys.path.append(cur_dir) sys.path.append(par_dir) import json from text_analysis.tools import to_kafka from tools.mysql_helper import mysqlConn,mysqlInsert,mysqlQuery,mysqlExecute,mysqlUpdate,mysqlDelete,getTableColumnNames import traceback import time from log_util.set_logger import set_logger logging=set_logger('results.log') from views import task_queue def process_data(): while True: try: # print("task_queue:",task_queue) if task_queue.qsize() >0: try: raw_data = task_queue.get() res = "" logging.info("启动数据处理线程——") logging.info(raw_data) flag = raw_data["metadata"]["admin"]["type"] # type分为execute、query、insert、update、delete if flag == 'insert': res = mysqlInsert(raw_data, logging) elif flag == 'execute': res = mysqlExecute(raw_data, logging) elif flag == 'update': res = mysqlUpdate(raw_data, logging) elif flag == 'query': res = mysqlQuery(raw_data, logging) elif flag == 'delete': res = mysqlDelete(raw_data, logging) raw_data["result"] = res logging.info("************写入kafka***********") to_kafka.send_kafka(raw_data) except: raw_data["result"] = {"successCode": "0", "errorLog": "", "results": ""} raw_data["result"]["errorLog"] = traceback.format_exc() to_kafka.send_kafka(raw_data) else: logging.info("暂无任务,进入休眠--") print("222222222222222222222222") time.sleep(10) except: logging.error(traceback.format_exc())