图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

443 lines
17 KiB

  1. import gc
  2. import importlib
  3. import sys
  4. import uuid
  5. from unittest.mock import patch, call, Mock
  6. import pytest
  7. from objgraph import count as count_refs_by_type
  8. from kazoo.testing import KazooTestHarness
  9. from kazoo.exceptions import KazooException
  10. from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
  11. class KazooAdaptiveHandlerTestCase(KazooTestHarness):
  12. HANDLERS = (
  13. ("kazoo.handlers.gevent", "SequentialGeventHandler"),
  14. ("kazoo.handlers.eventlet", "SequentialEventletHandler"),
  15. ("kazoo.handlers.threading", "SequentialThreadingHandler"),
  16. )
  17. def setUp(self):
  18. self.handler = self.choose_an_installed_handler()
  19. self.setup_zookeeper(handler=self.handler)
  20. def tearDown(self):
  21. self.handler = None
  22. self.teardown_zookeeper()
  23. def choose_an_installed_handler(self):
  24. for handler_module, handler_class in self.HANDLERS:
  25. if (
  26. handler_module == "kazoo.handlers.gevent"
  27. and sys.platform == "win32"
  28. ):
  29. continue
  30. try:
  31. mod = importlib.import_module(handler_module)
  32. cls = getattr(mod, handler_class)
  33. except ImportError:
  34. continue
  35. else:
  36. return cls()
  37. raise ImportError("No available handler")
  38. class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
  39. def setUp(self):
  40. super(KazooTreeCacheTests, self).setUp()
  41. self._event_queue = self.client.handler.queue_impl()
  42. self._error_queue = self.client.handler.queue_impl()
  43. self.path = None
  44. self.cache = None
  45. def tearDown(self):
  46. if not self._error_queue.empty():
  47. try:
  48. raise self._error_queue.get()
  49. except FakeException:
  50. pass
  51. if self.cache is not None:
  52. self.cache.close()
  53. self.cache = None
  54. super(KazooTreeCacheTests, self).tearDown()
  55. def make_cache(self):
  56. if self.cache is None:
  57. self.path = "/" + uuid.uuid4().hex
  58. self.cache = TreeCache(self.client, self.path)
  59. self.cache.listen(lambda event: self._event_queue.put(event))
  60. self.cache.listen_fault(lambda error: self._error_queue.put(error))
  61. self.cache.start()
  62. return self.cache
  63. def wait_cache(self, expect=None, since=None, timeout=10):
  64. started = since is None
  65. while True:
  66. event = self._event_queue.get(timeout=timeout)
  67. if started:
  68. if expect is not None:
  69. assert event.event_type == expect
  70. return event
  71. if event.event_type == since:
  72. started = True
  73. if expect is None:
  74. return
  75. def spy_client(self, method_name):
  76. method = getattr(self.client, method_name)
  77. return patch.object(self.client, method_name, wraps=method)
  78. def _wait_gc(self):
  79. # trigger switching on some coroutine handlers
  80. self.client.handler.sleep_func(0.1)
  81. completion_queue = getattr(self.handler, "completion_queue", None)
  82. if completion_queue is not None:
  83. while not self.client.handler.completion_queue.empty():
  84. self.client.handler.sleep_func(0.1)
  85. for gen in range(3):
  86. gc.collect(gen)
  87. def count_tree_node(self):
  88. # inspect GC and count tree nodes for checking memory leak
  89. for retry in range(10):
  90. result = set()
  91. for _ in range(5):
  92. self._wait_gc()
  93. result.add(count_refs_by_type("TreeNode"))
  94. if len(result) == 1:
  95. return list(result)[0]
  96. raise RuntimeError("could not count refs exactly")
  97. def test_start(self):
  98. self.make_cache()
  99. self.wait_cache(since=TreeEvent.INITIALIZED)
  100. stat = self.client.exists(self.path)
  101. assert stat.version == 0
  102. assert self.cache._state == TreeCache.STATE_STARTED
  103. assert self.cache._root._state == TreeNode.STATE_LIVE
  104. def test_start_started(self):
  105. self.make_cache()
  106. with pytest.raises(KazooException):
  107. self.cache.start()
  108. def test_start_closed(self):
  109. self.make_cache()
  110. self.cache.close()
  111. with pytest.raises(KazooException):
  112. self.cache.start()
  113. def test_close(self):
  114. assert self.count_tree_node() == 0
  115. self.make_cache()
  116. self.wait_cache(since=TreeEvent.INITIALIZED)
  117. self.client.create(self.path + "/foo/bar/baz", makepath=True)
  118. for _ in range(3):
  119. self.wait_cache(TreeEvent.NODE_ADDED)
  120. # setup stub watchers which are outside of tree cache
  121. stub_data_watcher = Mock(spec=lambda event: None)
  122. stub_child_watcher = Mock(spec=lambda event: None)
  123. self.client.get(self.path + "/foo", stub_data_watcher)
  124. self.client.get_children(self.path + "/foo", stub_child_watcher)
  125. # watchers inside tree cache should be here
  126. root_path = self.client.chroot + self.path
  127. assert len(self.client._data_watchers[root_path + "/foo"]) == 2
  128. assert len(self.client._data_watchers[root_path + "/foo/bar"]) == 1
  129. assert len(self.client._data_watchers[root_path + "/foo/bar/baz"]) == 1
  130. assert len(self.client._child_watchers[root_path + "/foo"]) == 2
  131. assert len(self.client._child_watchers[root_path + "/foo/bar"]) == 1
  132. assert (
  133. len(self.client._child_watchers[root_path + "/foo/bar/baz"]) == 1
  134. )
  135. self.cache.close()
  136. # nothing should be published since tree closed
  137. assert self._event_queue.empty()
  138. # tree should be empty
  139. assert self.cache._root._children == {}
  140. assert self.cache._root._data is None
  141. assert self.cache._state == TreeCache.STATE_CLOSED
  142. # node state should not be changed
  143. assert self.cache._root._state != TreeNode.STATE_DEAD
  144. # watchers should be reset
  145. assert len(self.client._data_watchers[root_path + "/foo"]) == 1
  146. assert len(self.client._data_watchers[root_path + "/foo/bar"]) == 0
  147. assert len(self.client._data_watchers[root_path + "/foo/bar/baz"]) == 0
  148. assert len(self.client._child_watchers[root_path + "/foo"]) == 1
  149. assert len(self.client._child_watchers[root_path + "/foo/bar"]) == 0
  150. assert (
  151. len(self.client._child_watchers[root_path + "/foo/bar/baz"]) == 0
  152. )
  153. # outside watchers should not be deleted
  154. assert (
  155. list(self.client._data_watchers[root_path + "/foo"])[0]
  156. == stub_data_watcher
  157. )
  158. assert (
  159. list(self.client._child_watchers[root_path + "/foo"])[0]
  160. == stub_child_watcher
  161. )
  162. # should not be any leaked memory (tree node) here
  163. self.cache = None
  164. assert self.count_tree_node() == 0
  165. def test_delete_operation(self):
  166. self.make_cache()
  167. self.wait_cache(since=TreeEvent.INITIALIZED)
  168. assert self.count_tree_node() == 1
  169. self.client.create(self.path + "/foo/bar/baz", makepath=True)
  170. for _ in range(3):
  171. self.wait_cache(TreeEvent.NODE_ADDED)
  172. self.client.delete(self.path + "/foo", recursive=True)
  173. for _ in range(3):
  174. self.wait_cache(TreeEvent.NODE_REMOVED)
  175. # tree should be empty
  176. assert self.cache._root._children == {}
  177. # watchers should be reset
  178. root_path = self.client.chroot + self.path
  179. assert self.client._data_watchers[root_path + "/foo"] == set()
  180. assert self.client._data_watchers[root_path + "/foo/bar"] == set()
  181. assert self.client._data_watchers[root_path + "/foo/bar/baz"] == set()
  182. assert self.client._child_watchers[root_path + "/foo"] == set()
  183. assert self.client._child_watchers[root_path + "/foo/bar"] == set()
  184. assert self.client._child_watchers[root_path + "/foo/bar/baz"] == set()
  185. # should not be any leaked memory (tree node) here
  186. assert self.count_tree_node() == 1
  187. def test_children_operation(self):
  188. self.make_cache()
  189. self.wait_cache(since=TreeEvent.INITIALIZED)
  190. self.client.create(self.path + "/test_children", b"test_children_1")
  191. event = self.wait_cache(TreeEvent.NODE_ADDED)
  192. assert event.event_type == TreeEvent.NODE_ADDED
  193. assert event.event_data.path == self.path + "/test_children"
  194. assert event.event_data.data == b"test_children_1"
  195. assert event.event_data.stat.version == 0
  196. self.client.set(self.path + "/test_children", b"test_children_2")
  197. event = self.wait_cache(TreeEvent.NODE_UPDATED)
  198. assert event.event_type == TreeEvent.NODE_UPDATED
  199. assert event.event_data.path == self.path + "/test_children"
  200. assert event.event_data.data == b"test_children_2"
  201. assert event.event_data.stat.version == 1
  202. self.client.delete(self.path + "/test_children")
  203. event = self.wait_cache(TreeEvent.NODE_REMOVED)
  204. assert event.event_type == TreeEvent.NODE_REMOVED
  205. assert event.event_data.path == self.path + "/test_children"
  206. assert event.event_data.data == b"test_children_2"
  207. assert event.event_data.stat.version == 1
  208. def test_subtree_operation(self):
  209. self.make_cache()
  210. self.wait_cache(since=TreeEvent.INITIALIZED)
  211. self.client.create(self.path + "/foo/bar/baz", makepath=True)
  212. for relative_path in ("/foo", "/foo/bar", "/foo/bar/baz"):
  213. event = self.wait_cache(TreeEvent.NODE_ADDED)
  214. assert event.event_type == TreeEvent.NODE_ADDED
  215. assert event.event_data.path == self.path + relative_path
  216. assert event.event_data.data == b""
  217. assert event.event_data.stat.version == 0
  218. self.client.delete(self.path + "/foo", recursive=True)
  219. for relative_path in ("/foo/bar/baz", "/foo/bar", "/foo"):
  220. event = self.wait_cache(TreeEvent.NODE_REMOVED)
  221. assert event.event_type == TreeEvent.NODE_REMOVED
  222. assert event.event_data.path == self.path + relative_path
  223. def test_get_data(self):
  224. cache = self.make_cache()
  225. self.wait_cache(since=TreeEvent.INITIALIZED)
  226. self.client.create(self.path + "/foo/bar/baz", b"@", makepath=True)
  227. self.wait_cache(TreeEvent.NODE_ADDED)
  228. self.wait_cache(TreeEvent.NODE_ADDED)
  229. self.wait_cache(TreeEvent.NODE_ADDED)
  230. with patch.object(cache, "_client"): # disable any remote operation
  231. assert cache.get_data(self.path).data == b""
  232. assert cache.get_data(self.path).stat.version == 0
  233. assert cache.get_data(self.path + "/foo").data == b""
  234. assert cache.get_data(self.path + "/foo").stat.version == 0
  235. assert cache.get_data(self.path + "/foo/bar").data == b""
  236. assert cache.get_data(self.path + "/foo/bar").stat.version == 0
  237. assert cache.get_data(self.path + "/foo/bar/baz").data == b"@"
  238. assert cache.get_data(self.path + "/foo/bar/baz").stat.version == 0
  239. def test_get_children(self):
  240. cache = self.make_cache()
  241. self.wait_cache(since=TreeEvent.INITIALIZED)
  242. self.client.create(self.path + "/foo/bar/baz", b"@", makepath=True)
  243. self.wait_cache(TreeEvent.NODE_ADDED)
  244. self.wait_cache(TreeEvent.NODE_ADDED)
  245. self.wait_cache(TreeEvent.NODE_ADDED)
  246. with patch.object(cache, "_client"): # disable any remote operation
  247. assert (
  248. cache.get_children(self.path + "/foo/bar/baz") == frozenset()
  249. )
  250. assert cache.get_children(self.path + "/foo/bar") == frozenset(
  251. ["baz"]
  252. )
  253. assert cache.get_children(self.path + "/foo") == frozenset(["bar"])
  254. assert cache.get_children(self.path) == frozenset(["foo"])
  255. def test_get_data_out_of_tree(self):
  256. self.make_cache()
  257. self.wait_cache(since=TreeEvent.INITIALIZED)
  258. with pytest.raises(ValueError):
  259. self.cache.get_data("/out_of_tree")
  260. def test_get_children_out_of_tree(self):
  261. self.make_cache()
  262. self.wait_cache(since=TreeEvent.INITIALIZED)
  263. with pytest.raises(ValueError):
  264. self.cache.get_children("/out_of_tree")
  265. def test_get_data_no_node(self):
  266. cache = self.make_cache()
  267. self.wait_cache(since=TreeEvent.INITIALIZED)
  268. with patch.object(cache, "_client"): # disable any remote operation
  269. assert cache.get_data(self.path + "/non_exists") is None
  270. def test_get_children_no_node(self):
  271. cache = self.make_cache()
  272. self.wait_cache(since=TreeEvent.INITIALIZED)
  273. with patch.object(cache, "_client"): # disable any remote operation
  274. assert cache.get_children(self.path + "/non_exists") is None
  275. def test_session_reconnected(self):
  276. self.make_cache()
  277. self.wait_cache(since=TreeEvent.INITIALIZED)
  278. self.client.create(self.path + "/foo")
  279. event = self.wait_cache(TreeEvent.NODE_ADDED)
  280. assert event.event_data.path == self.path + "/foo"
  281. with self.spy_client("get_async") as get_data:
  282. with self.spy_client("get_children_async") as get_children:
  283. # session suspended
  284. self.lose_connection(self.client.handler.event_object)
  285. self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
  286. # There are a serial refreshing operation here. But NODE_ADDED
  287. # events will not be raised because the zxid of nodes are the
  288. # same during reconnecting.
  289. # connection restore
  290. self.wait_cache(TreeEvent.CONNECTION_RECONNECTED)
  291. # wait for outstanding operations
  292. while self.cache._outstanding_ops > 0:
  293. self.client.handler.sleep_func(0.1)
  294. # inspect in-memory nodes
  295. _node_root = self.cache._root
  296. _node_foo = self.cache._root._children["foo"]
  297. # make sure that all nodes are refreshed
  298. get_data.assert_has_calls(
  299. [
  300. call(self.path, watch=_node_root._process_watch),
  301. call(
  302. self.path + "/foo", watch=_node_foo._process_watch
  303. ),
  304. ],
  305. any_order=True,
  306. )
  307. get_children.assert_has_calls(
  308. [
  309. call(self.path, watch=_node_root._process_watch),
  310. call(
  311. self.path + "/foo", watch=_node_foo._process_watch
  312. ),
  313. ],
  314. any_order=True,
  315. )
  316. def test_root_recreated(self):
  317. self.make_cache()
  318. self.wait_cache(since=TreeEvent.INITIALIZED)
  319. # remove root node
  320. self.client.delete(self.path)
  321. event = self.wait_cache(TreeEvent.NODE_REMOVED)
  322. assert event.event_type == TreeEvent.NODE_REMOVED
  323. assert event.event_data.data == b""
  324. assert event.event_data.path == self.path
  325. assert event.event_data.stat.version == 0
  326. # re-create root node
  327. self.client.ensure_path(self.path)
  328. event = self.wait_cache(TreeEvent.NODE_ADDED)
  329. assert event.event_type == TreeEvent.NODE_ADDED
  330. assert event.event_data.data == b""
  331. assert event.event_data.path == self.path
  332. assert event.event_data.stat.version == 0
  333. assert self.cache._outstanding_ops >= 0, (
  334. "unexpected outstanding ops %r" % self.cache._outstanding_ops
  335. )
  336. def test_exception_handler(self):
  337. error_value = FakeException()
  338. error_handler = Mock()
  339. with patch.object(TreeNode, "on_deleted") as on_deleted:
  340. on_deleted.side_effect = [error_value]
  341. self.make_cache()
  342. self.cache.listen_fault(error_handler)
  343. self.cache.close()
  344. error_handler.assert_called_once_with(error_value)
  345. def test_exception_suppressed(self):
  346. self.make_cache()
  347. self.wait_cache(since=TreeEvent.INITIALIZED)
  348. # stoke up ConnectionClosedError
  349. self.client.stop()
  350. self.client.close()
  351. self.client.handler.start() # keep the async completion
  352. self.wait_cache(since=TreeEvent.CONNECTION_LOST)
  353. with patch.object(TreeNode, "on_created") as on_created:
  354. self.cache._root._call_client("exists", "/")
  355. self.cache._root._call_client("get", "/")
  356. self.cache._root._call_client("get_children", "/")
  357. self.wait_cache(since=TreeEvent.INITIALIZED)
  358. on_created.assert_not_called()
  359. assert self.cache._outstanding_ops == 0
  360. class FakeException(Exception):
  361. pass