m2m模型翻译
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

175 lines
5.7 KiB

#coding:utf8
from flask import Flask, request, jsonify
from flask_restful import Api, Resource
from queue_manager import add_task,add_task_left
from config_loader import load_config
import threading
from translate_handler import translate_process
# 加载配置和日志
import logging
from logging.handlers import TimedRotatingFileHandler
from queue_manager import task_queue # 假设task_queue是一个queue.Queue对象
from load_scenes import load_scenes_version
from zk_util import monitor
import global_dict
import os
import sys
import signal
import json
# 创建日志记录器
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 加载配置
config = load_config()
# 创建一个处理器,用于按日期切分日志文件
handler = TimedRotatingFileHandler(
'./logs/MTtranslate.log',
when='midnight', # 每天午夜切分
interval=1,
backupCount=7 # 保留7个备份日志文件
)
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 添加处理器到日志记录器
logger.addHandler(handler)
app = Flask(__name__)
api = Api(app)
class TranslateTask(Resource):
def post(self):
data = request.json
# 将任务添加到队列
if 'trace' in data and data['trace']:
logger.info(f"调试模式任务接收-----")
add_task_left(data)
else:
add_task(data)
return {"status": "Task added to queue"}, 200
# 注册 OCR 接口
api.add_resource(TranslateTask, '/translate')
@app.route('/health', methods=['GET'])
def health():
"""健康检查接口,供 Spring Boot Admin 使用"""
return jsonify({"status": "UP"})
def writeTxt(filePath, result):
'''
写入文件
:param filePath:
:param result:
:return:
'''
with open(filePath, 'a', encoding='utf-8') as f: # 使用上下文管理器自动关闭文件
f.write(result + '\n') # 直接写入字符串和换行符
def write_queue_to_file(filename='queue_contents.jsonl'):
"""
将队列中的所有内容写入指定的文件。
每个任务以JSON格式存储在单独的行中。
"""
try:
while not task_queue.empty():
try:
item = task_queue.get_nowait()
# 将任务转换为JSON字符串
json_item = json.dumps(item)
writeTxt(filename,json_item)
logger.info(f"Wrote to file: {json_item}")
task_queue.task_done()
except Exception as e:
logger.error(f"Error writing queue item to file: {e}")
break
logger.info(f"Successfully wrote queue contents to {filename}.")
except Exception as e:
logger.error(f"Failed to write queue to file: {e}")
def read_queue_from_file(filename='queue_contents.jsonl'):
"""
从指定的文件中读取队列内容并加载到队列中。
每行应为一个有效的JSON对象。
"""
if not os.path.exists(filename):
logger.info(f"No existing queue file found at {filename}. Starting with an empty queue.")
return
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
# 解析JSON字符串
item = json.loads(line)
add_task(item)
logger.info(f"Loaded task from file: {item}")
except json.JSONDecodeError as je:
logger.error(f"Invalid JSON format in queue file: {line} | Error: {je}")
except Exception as e:
logger.error(f"Error loading queue item from file: {e}")
logger.info(f"Successfully loaded queue contents from {filename}.")
except Exception as e:
logger.error(f"Failed to read queue from file: {e}")
finally:
delete_file(filename)
def delete_file(file_path):
'''
删除指定的文件
:param file_path: 要删除的文件路径
:return:
'''
if os.path.exists(file_path):
try:
os.remove(file_path)
logger.info(f"文件已删除: {file_path}")
except Exception as e:
logger.error(f"删除文件时发生错误: {e}")
else:
logger.warning(f"文件不存在: {file_path}")
def handle_sigterm(signum, frame):
"""
信号处理函数,用于处理SIGTERM信号。
"""
logger.info("Received SIGTERM signal. Saving queue to file and shutting down gracefully...")
write_queue_to_file('queue_contents.jsonl')
logger.info("Queue saved. Exiting now.")
sys.exit(0)
def main():
# 设置SIGTERM信号的处理器
signal.signal(signal.SIGTERM, handle_sigterm)
# 从redis加载启动的场景信息
load_scenes_version(config['redis']['host'], config['redis']['port'], config['redis']['db'])
# 启动zookeeper监听
monitor(config['zookeeper']['host'], config['zookeeper']['node'])
# 从文件加载队列内容
read_queue_from_file('queue_contents.jsonl')
# 启动 OCR 处理线程
ocr_thread = threading.Thread(target=translate_process, daemon=True)
ocr_thread.start()
logger.info("Translate processing thread started.")
# 启动 Flask 应用
try:
app.run(host='0.0.0.0', port=8023)
except Exception as e:
logger.error(f"Flask app encountered an error: {e}")
finally:
# 程序退出前,保存队列内容
global_dict.is_start = False
write_queue_to_file('queue_contents.jsonl')
logger.info("Flask app has been shut down.")
if __name__ == '__main__':
# 启动 translate 处理线程
main()