图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

118 lines
3.7 KiB

#coding:utf8
from kazoo.client import KazooClient
import threading
import time
from global_dict import global_scenes_manager
import json
# 初始化日志
import logging
logger = logging.getLogger(__name__)
import os # 确保导入了 os 模块
# 确保没有覆盖内置的 open 函数
import builtins
assert open is builtins.open, "The built-in 'open' function has been overridden!"
def start_zookeeper_listeners(zk, node_path, stop_event, initial_data, initial_children):
'''
在独立的线程中启动 ZooKeeper 监听器,忽略启动之前的节点变化。
:param zk:
:param node_path:
:param stop_event:
:param initial_data:
:param initial_children:
:return:
'''
def data_change_listener(data, stat):
if data == initial_data:
return # 忽略之前的状态
if data:
logger.info(f"[DataWatch] 节点数据变化:{data.decode('utf-8')}")
node_data = json.loads(data)
if 'version' in node_data:
scenes_id = str(node_data['scenes_id'])
version = str(node_data['version'])
global_scenes_manager[scenes_id] = version
else:
logger.info("[DataWatch] 节点已删除或不存在")
def children_change_listener(children):
if children == initial_children:
return # 忽略之前的状态
logger.info(f"[ChildrenWatch] 子节点列表变化:{children}")
# 设置监听器
zk.DataWatch(node_path, data_change_listener)
zk.ChildrenWatch(node_path, children_change_listener)
# 等待停止事件
stop_event.wait()
def simulate_changes(zk, node_path):
"""
模拟节点的数据和子节点的变化。
"""
try:
time.sleep(2)
print("模拟:更新节点数据")
zk.set(node_path, b"updated data")
time.sleep(2)
print("模拟:创建子节点 child1")
zk.create(f"{node_path}/child1", b"child1 data")
time.sleep(2)
print("模拟:删除子节点 child1")
zk.delete(f"{node_path}/child1")
time.sleep(2)
print("模拟:删除节点")
zk.delete(node_path)
except Exception as e:
print(f"模拟过程中发生错误: {e}")
def monitor(hosts,node_path):
# 连接到 ZooKeeper 服务器
zk = KazooClient(hosts=hosts)
zk.start()
# 获取启动监听器之前的节点状态
if zk.exists(node_path):
initial_data, _ = zk.get(node_path) # 获取节点数据
initial_children = zk.get_children(node_path) # 获取子节点列表
else:
# 如果节点不存在,初始化为空
initial_data = None
initial_children = []
# 创建一个停止事件
stop_event = threading.Event()
# 创建一个线程来运行监听器
listener_thread = threading.Thread(target=start_zookeeper_listeners, args=(zk, node_path, stop_event, initial_data, initial_children))
listener_thread.daemon = True
listener_thread.start()
# 创建一个线程来模拟节点变化
# simulator_thread = threading.Thread(target=simulate_changes, args=(zk, node_path))
# simulator_thread.start()
#
# try:
# # 主线程可以执行其他任务,或者等待子线程完成
# simulator_thread.join()
# except KeyboardInterrupt:
# print("程序被用户中断")
# finally:
# # 触发停止事件
# stop_event.set()
# listener_thread.join()
# zk.stop()
# zk.close()
# print("ZooKeeper 客户端已关闭")
if __name__ == "__main__":
from config_loader import load_config
config = load_config()
monitor(config['zookeeper']['host'], config['zookeeper']['node'])