图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
3.7 KiB

  1. #coding:utf8
  2. from kazoo.client import KazooClient
  3. import threading
  4. import time
  5. from global_dict import global_scenes_manager
  6. import json
  7. # 初始化日志
  8. import logging
  9. logger = logging.getLogger(__name__)
  10. import os # 确保导入了 os 模块
  11. # 确保没有覆盖内置的 open 函数
  12. import builtins
  13. assert open is builtins.open, "The built-in 'open' function has been overridden!"
  14. def start_zookeeper_listeners(zk, node_path, stop_event, initial_data, initial_children):
  15. '''
  16. 线 ZooKeeper
  17. :param zk:
  18. :param node_path:
  19. :param stop_event:
  20. :param initial_data:
  21. :param initial_children:
  22. :return:
  23. '''
  24. def data_change_listener(data, stat):
  25. if data == initial_data:
  26. return # 忽略之前的状态
  27. if data:
  28. logger.info(f"[DataWatch] 节点数据变化:{data.decode('utf-8')}")
  29. node_data = json.loads(data)
  30. if 'version' in node_data:
  31. scenes_id = str(node_data['scenes_id'])
  32. version = str(node_data['version'])
  33. global_scenes_manager[scenes_id] = version
  34. else:
  35. logger.info("[DataWatch] 节点已删除或不存在")
  36. def children_change_listener(children):
  37. if children == initial_children:
  38. return # 忽略之前的状态
  39. logger.info(f"[ChildrenWatch] 子节点列表变化:{children}")
  40. # 设置监听器
  41. zk.DataWatch(node_path, data_change_listener)
  42. zk.ChildrenWatch(node_path, children_change_listener)
  43. # 等待停止事件
  44. stop_event.wait()
  45. def simulate_changes(zk, node_path):
  46. """
  47. """
  48. try:
  49. time.sleep(2)
  50. print("模拟:更新节点数据")
  51. zk.set(node_path, b"updated data")
  52. time.sleep(2)
  53. print("模拟:创建子节点 child1")
  54. zk.create(f"{node_path}/child1", b"child1 data")
  55. time.sleep(2)
  56. print("模拟:删除子节点 child1")
  57. zk.delete(f"{node_path}/child1")
  58. time.sleep(2)
  59. print("模拟:删除节点")
  60. zk.delete(node_path)
  61. except Exception as e:
  62. print(f"模拟过程中发生错误: {e}")
  63. def monitor(hosts,node_path):
  64. # 连接到 ZooKeeper 服务器
  65. zk = KazooClient(hosts=hosts)
  66. zk.start()
  67. # 获取启动监听器之前的节点状态
  68. if zk.exists(node_path):
  69. initial_data, _ = zk.get(node_path) # 获取节点数据
  70. initial_children = zk.get_children(node_path) # 获取子节点列表
  71. else:
  72. # 如果节点不存在,初始化为空
  73. initial_data = None
  74. initial_children = []
  75. # 创建一个停止事件
  76. stop_event = threading.Event()
  77. # 创建一个线程来运行监听器
  78. listener_thread = threading.Thread(target=start_zookeeper_listeners, args=(zk, node_path, stop_event, initial_data, initial_children))
  79. listener_thread.daemon = True
  80. listener_thread.start()
  81. # 创建一个线程来模拟节点变化
  82. # simulator_thread = threading.Thread(target=simulate_changes, args=(zk, node_path))
  83. # simulator_thread.start()
  84. #
  85. # try:
  86. # # 主线程可以执行其他任务,或者等待子线程完成
  87. # simulator_thread.join()
  88. # except KeyboardInterrupt:
  89. # print("程序被用户中断")
  90. # finally:
  91. # # 触发停止事件
  92. # stop_event.set()
  93. # listener_thread.join()
  94. # zk.stop()
  95. # zk.close()
  96. # print("ZooKeeper 客户端已关闭")
  97. if __name__ == "__main__":
  98. from config_loader import load_config
  99. config = load_config()
  100. monitor(config['zookeeper']['host'], config['zookeeper']['node'])