You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
178 lines
5.6 KiB
178 lines
5.6 KiB
# coding:utf8
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import os
|
|
import json
|
|
from flask import Flask, request, jsonify
|
|
from flask_restful import Api, Resource
|
|
from queue_manager import add_task,add_task_left
|
|
from queue_manager import task_queue # 假设task_queue是一个queue.Queue对象
|
|
from config_loader import load_config
|
|
from ocr_handler import ocr_process
|
|
# 加载配置和日志
|
|
import logging
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from load_scenes import load_scenes_version
|
|
from zk_util import monitor
|
|
import global_dict
|
|
|
|
# 创建日志记录器
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# 加载配置
|
|
config = load_config()
|
|
|
|
|
|
# 创建一个处理器,用于按日期切分日志文件
|
|
handler = TimedRotatingFileHandler(
|
|
'./logs/cnocr.log',
|
|
when='midnight', # 每天午夜切分
|
|
interval=1,
|
|
backupCount=7 # 保留7个备份日志文件
|
|
)
|
|
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
# 添加处理器到日志记录器
|
|
logger.addHandler(handler)
|
|
|
|
app = Flask(__name__)
|
|
api = Api(app)
|
|
|
|
class OCRTask(Resource):
|
|
def post(self):
|
|
data = request.json
|
|
# 将任务添加到队列
|
|
# logger.info(f"Received OCR task: {data}")
|
|
if 'trace' in data and data['trace']:
|
|
logger.info(f"调试模式任务接收-----")
|
|
add_task_left(data)
|
|
else:
|
|
add_task(data)
|
|
return {"status": "Task added to queue"}, 200
|
|
|
|
# 注册 OCR 接口
|
|
api.add_resource(OCRTask, '/ocr')
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""健康检查接口,供 Spring Boot Admin 使用"""
|
|
return jsonify({"status": "UP"})
|
|
|
|
|
|
def writeTxt(filePath, result):
|
|
'''
|
|
写入文件
|
|
:param filePath:
|
|
:param result:
|
|
:return:
|
|
'''
|
|
with open(filePath, 'a', encoding='utf-8') as f: # 使用上下文管理器自动关闭文件
|
|
f.write(result + '\n') # 直接写入字符串和换行符
|
|
|
|
def write_queue_to_file(filename='queue_contents.jsonl'):
|
|
"""
|
|
将队列中的所有内容写入指定的文件。
|
|
每个任务以JSON格式存储在单独的行中。
|
|
"""
|
|
try:
|
|
while not task_queue.empty():
|
|
try:
|
|
item = task_queue.get_nowait()
|
|
# 将任务转换为JSON字符串
|
|
json_item = json.dumps(item)
|
|
writeTxt(filename,json_item)
|
|
logger.info(f"Wrote to file: {json_item}")
|
|
task_queue.task_done()
|
|
except Exception as e:
|
|
logger.error(f"Error writing queue item to file: {e}")
|
|
break
|
|
logger.info(f"Successfully wrote queue contents to {filename}.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to write queue to file: {e}")
|
|
|
|
def read_queue_from_file(filename='queue_contents.jsonl'):
|
|
"""
|
|
从指定的文件中读取队列内容并加载到队列中。
|
|
每行应为一个有效的JSON对象。
|
|
"""
|
|
if not os.path.exists(filename):
|
|
logger.info(f"No existing queue file found at {filename}. Starting with an empty queue.")
|
|
return
|
|
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
# 解析JSON字符串
|
|
item = json.loads(line)
|
|
add_task(item)
|
|
logger.info(f"Loaded task from file: {item}")
|
|
except json.JSONDecodeError as je:
|
|
logger.error(f"Invalid JSON format in queue file: {line} | Error: {je}")
|
|
except Exception as e:
|
|
logger.error(f"Error loading queue item from file: {e}")
|
|
logger.info(f"Successfully loaded queue contents from {filename}.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to read queue from file: {e}")
|
|
finally:
|
|
delete_file(filename)
|
|
def delete_file(file_path):
|
|
'''
|
|
删除指定的文件
|
|
:param file_path: 要删除的文件路径
|
|
:return:
|
|
'''
|
|
if os.path.exists(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
logger.info(f"文件已删除: {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"删除文件时发生错误: {e}")
|
|
else:
|
|
logger.warning(f"文件不存在: {file_path}")
|
|
|
|
def handle_sigterm(signum, frame):
|
|
"""
|
|
信号处理函数,用于处理SIGTERM信号。
|
|
"""
|
|
logger.info("Received SIGTERM signal. Saving queue to file and shutting down gracefully...")
|
|
write_queue_to_file('queue_contents.jsonl')
|
|
logger.info("Queue saved. Exiting now.")
|
|
sys.exit(0)
|
|
|
|
def main():
|
|
# 设置SIGTERM信号的处理器
|
|
signal.signal(signal.SIGTERM, handle_sigterm)
|
|
|
|
# 从redis加载启动的场景信息
|
|
load_scenes_version(config['redis']['host'], config['redis']['port'], config['redis']['db'])
|
|
|
|
# 启动zookeeper监听
|
|
monitor(config['zookeeper']['host'], config['zookeeper']['node'])
|
|
|
|
# 从文件加载队列内容
|
|
read_queue_from_file('queue_contents.jsonl')
|
|
|
|
# 启动 OCR 处理线程
|
|
ocr_thread = threading.Thread(target=ocr_process, daemon=True)
|
|
ocr_thread.start()
|
|
logger.info("OCR processing thread started.")
|
|
|
|
# 启动 Flask 应用
|
|
try:
|
|
app.run(host='0.0.0.0', port=8021)
|
|
except Exception as e:
|
|
logger.error(f"Flask app encountered an error: {e}")
|
|
finally:
|
|
# 程序退出前,保存队列内容
|
|
global_dict.is_start = False
|
|
write_queue_to_file('queue_contents.jsonl')
|
|
logger.info("Flask app has been shut down.")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|