图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.6 KiB

  1. # coding:utf8
  2. import signal
  3. import sys
  4. import threading
  5. import time
  6. import os
  7. import json
  8. from flask import Flask, request, jsonify
  9. from flask_restful import Api, Resource
  10. from queue_manager import add_task,add_task_left
  11. from queue_manager import task_queue # 假设task_queue是一个queue.Queue对象
  12. from config_loader import load_config
  13. from ocr_handler import ocr_process
  14. # 加载配置和日志
  15. import logging
  16. from logging.handlers import TimedRotatingFileHandler
  17. from load_scenes import load_scenes_version
  18. from zk_util import monitor
  19. import global_dict
  20. # 创建日志记录器
  21. logger = logging.getLogger()
  22. logger.setLevel(logging.INFO)
  23. # 加载配置
  24. config = load_config()
  25. # 创建一个处理器,用于按日期切分日志文件
  26. handler = TimedRotatingFileHandler(
  27. './logs/cnocr.log',
  28. when='midnight', # 每天午夜切分
  29. interval=1,
  30. backupCount=7 # 保留7个备份日志文件
  31. )
  32. handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
  33. # 添加处理器到日志记录器
  34. logger.addHandler(handler)
  35. app = Flask(__name__)
  36. api = Api(app)
  37. class OCRTask(Resource):
  38. def post(self):
  39. data = request.json
  40. # 将任务添加到队列
  41. # logger.info(f"Received OCR task: {data}")
  42. if 'trace' in data and data['trace']:
  43. logger.info(f"调试模式任务接收-----")
  44. add_task_left(data)
  45. else:
  46. add_task(data)
  47. return {"status": "Task added to queue"}, 200
  48. # 注册 OCR 接口
  49. api.add_resource(OCRTask, '/ocr')
  50. @app.route('/health', methods=['GET'])
  51. def health():
  52. """健康检查接口,供 Spring Boot Admin 使用"""
  53. return jsonify({"status": "UP"})
  54. def writeTxt(filePath, result):
  55. '''
  56. :param filePath:
  57. :param result:
  58. :return:
  59. '''
  60. with open(filePath, 'a', encoding='utf-8') as f: # 使用上下文管理器自动关闭文件
  61. f.write(result + '\n') # 直接写入字符串和换行符
  62. def write_queue_to_file(filename='queue_contents.jsonl'):
  63. """
  64. JSON格式存储在单独的行中
  65. """
  66. try:
  67. while not task_queue.empty():
  68. try:
  69. item = task_queue.get_nowait()
  70. # 将任务转换为JSON字符串
  71. json_item = json.dumps(item)
  72. writeTxt(filename,json_item)
  73. logger.info(f"Wrote to file: {json_item}")
  74. task_queue.task_done()
  75. except Exception as e:
  76. logger.error(f"Error writing queue item to file: {e}")
  77. break
  78. logger.info(f"Successfully wrote queue contents to {filename}.")
  79. except Exception as e:
  80. logger.error(f"Failed to write queue to file: {e}")
  81. def read_queue_from_file(filename='queue_contents.jsonl'):
  82. """
  83. JSON对象
  84. """
  85. if not os.path.exists(filename):
  86. logger.info(f"No existing queue file found at {filename}. Starting with an empty queue.")
  87. return
  88. try:
  89. with open(filename, 'r', encoding='utf-8') as f:
  90. for line in f:
  91. line = line.strip()
  92. if line:
  93. try:
  94. # 解析JSON字符串
  95. item = json.loads(line)
  96. add_task(item)
  97. logger.info(f"Loaded task from file: {item}")
  98. except json.JSONDecodeError as je:
  99. logger.error(f"Invalid JSON format in queue file: {line} | Error: {je}")
  100. except Exception as e:
  101. logger.error(f"Error loading queue item from file: {e}")
  102. logger.info(f"Successfully loaded queue contents from {filename}.")
  103. except Exception as e:
  104. logger.error(f"Failed to read queue from file: {e}")
  105. finally:
  106. delete_file(filename)
  107. def delete_file(file_path):
  108. '''
  109. :param file_path:
  110. :return:
  111. '''
  112. if os.path.exists(file_path):
  113. try:
  114. os.remove(file_path)
  115. logger.info(f"文件已删除: {file_path}")
  116. except Exception as e:
  117. logger.error(f"删除文件时发生错误: {e}")
  118. else:
  119. logger.warning(f"文件不存在: {file_path}")
  120. def handle_sigterm(signum, frame):
  121. """
  122. SIGTERM信号
  123. """
  124. logger.info("Received SIGTERM signal. Saving queue to file and shutting down gracefully...")
  125. write_queue_to_file('queue_contents.jsonl')
  126. logger.info("Queue saved. Exiting now.")
  127. sys.exit(0)
  128. def main():
  129. # 设置SIGTERM信号的处理器
  130. signal.signal(signal.SIGTERM, handle_sigterm)
  131. # 从redis加载启动的场景信息
  132. load_scenes_version(config['redis']['host'], config['redis']['port'], config['redis']['db'])
  133. # 启动zookeeper监听
  134. monitor(config['zookeeper']['host'], config['zookeeper']['node'])
  135. # 从文件加载队列内容
  136. read_queue_from_file('queue_contents.jsonl')
  137. # 启动 OCR 处理线程
  138. ocr_thread = threading.Thread(target=ocr_process, daemon=True)
  139. ocr_thread.start()
  140. logger.info("OCR processing thread started.")
  141. # 启动 Flask 应用
  142. try:
  143. app.run(host='0.0.0.0', port=8021)
  144. except Exception as e:
  145. logger.error(f"Flask app encountered an error: {e}")
  146. finally:
  147. # 程序退出前,保存队列内容
  148. global_dict.is_start = False
  149. write_queue_to_file('queue_contents.jsonl')
  150. logger.info("Flask app has been shut down.")
  151. if __name__ == '__main__':
  152. main()