#coding:utf8 from kazoo.client import KazooClient import threading import time from global_dict import global_scenes_manager import json # 初始化日志 import logging logger = logging.getLogger(__name__) def start_zookeeper_listeners(zk, node_path, stop_event, initial_data, initial_children): ''' 在独立的线程中启动 ZooKeeper 监听器,忽略启动之前的节点变化。 :param zk: :param node_path: :param stop_event: :param initial_data: :param initial_children: :return: ''' def data_change_listener(data, stat): if data == initial_data: return # 忽略之前的状态 if data: logger.info(f"[DataWatch] 节点数据变化:{data.decode('utf-8')}") node_data = json.loads(data) if 'version' in node_data: scenes_id = str(node_data['scenes_id']) version = str(node_data['version']) global_scenes_manager[scenes_id] = version else: logger.info("[DataWatch] 节点已删除或不存在") def children_change_listener(children): if children == initial_children: return # 忽略之前的状态 logger.info(f"[ChildrenWatch] 子节点列表变化:{children}") # 设置监听器 zk.DataWatch(node_path, data_change_listener) zk.ChildrenWatch(node_path, children_change_listener) # 等待停止事件 stop_event.wait() def simulate_changes(zk, node_path): """ 模拟节点的数据和子节点的变化。 """ try: time.sleep(2) print("模拟:更新节点数据") zk.set(node_path, b"updated data") time.sleep(2) print("模拟:创建子节点 child1") zk.create(f"{node_path}/child1", b"child1 data") time.sleep(2) print("模拟:删除子节点 child1") zk.delete(f"{node_path}/child1") time.sleep(2) print("模拟:删除节点") zk.delete(node_path) except Exception as e: print(f"模拟过程中发生错误: {e}") def monitor(hosts,node_path): # 连接到 ZooKeeper 服务器 zk = KazooClient(hosts=hosts) zk.start() # 获取启动监听器之前的节点状态 if zk.exists(node_path): initial_data, _ = zk.get(node_path) # 获取节点数据 initial_children = zk.get_children(node_path) # 获取子节点列表 else: # 如果节点不存在,初始化为空 initial_data = None initial_children = [] # 创建一个停止事件 stop_event = threading.Event() # 创建一个线程来运行监听器 listener_thread = threading.Thread(target=start_zookeeper_listeners, args=(zk, node_path, stop_event, initial_data, initial_children)) listener_thread.daemon = True listener_thread.start() # 创建一个线程来模拟节点变化 # simulator_thread = threading.Thread(target=simulate_changes, args=(zk, node_path)) # simulator_thread.start() # # try: # # 主线程可以执行其他任务,或者等待子线程完成 # simulator_thread.join() # except KeyboardInterrupt: # print("程序被用户中断") # finally: # # 触发停止事件 # stop_event.set() # listener_thread.join() # zk.stop() # zk.close() # print("ZooKeeper 客户端已关闭") if __name__ == "__main__": from config_loader import load_config config = load_config() monitor(config['zookeeper']['host'], config['zookeeper']['node'])