图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

544 lines
14 KiB

  1. import time
  2. import threading
  3. import uuid
  4. import pytest
  5. from kazoo.exceptions import KazooException
  6. from kazoo.protocol.states import EventType
  7. from kazoo.testing import KazooTestCase
  8. class KazooDataWatcherTests(KazooTestCase):
  9. def setUp(self):
  10. super(KazooDataWatcherTests, self).setUp()
  11. self.path = "/" + uuid.uuid4().hex
  12. self.client.ensure_path(self.path)
  13. def test_data_watcher(self):
  14. update = threading.Event()
  15. data = [True]
  16. # Make it a non-existent path
  17. self.path += "f"
  18. @self.client.DataWatch(self.path)
  19. def changed(d, stat):
  20. data.pop()
  21. data.append(d)
  22. update.set()
  23. update.wait(10)
  24. assert data == [None]
  25. update.clear()
  26. self.client.create(self.path, b"fred")
  27. update.wait(10)
  28. assert data[0] == b"fred"
  29. update.clear()
  30. def test_data_watcher_once(self):
  31. update = threading.Event()
  32. data = [True]
  33. # Make it a non-existent path
  34. self.path += "f"
  35. dwatcher = self.client.DataWatch(self.path)
  36. @dwatcher
  37. def changed(d, stat):
  38. data.pop()
  39. data.append(d)
  40. update.set()
  41. update.wait(10)
  42. assert data == [None]
  43. update.clear()
  44. with pytest.raises(KazooException):
  45. @dwatcher
  46. def func(d, stat):
  47. data.pop()
  48. def test_data_watcher_with_event(self):
  49. # Test that the data watcher gets passed the event, if it
  50. # accepts three arguments
  51. update = threading.Event()
  52. data = [True]
  53. # Make it a non-existent path
  54. self.path += "f"
  55. @self.client.DataWatch(self.path)
  56. def changed(d, stat, event):
  57. data.pop()
  58. data.append(event)
  59. update.set()
  60. update.wait(10)
  61. assert data == [None]
  62. update.clear()
  63. self.client.create(self.path, b"fred")
  64. update.wait(10)
  65. assert data[0].type == EventType.CREATED
  66. update.clear()
  67. def test_func_style_data_watch(self):
  68. update = threading.Event()
  69. data = [True]
  70. # Make it a non-existent path
  71. path = self.path + "f"
  72. def changed(d, stat):
  73. data.pop()
  74. data.append(d)
  75. update.set()
  76. self.client.DataWatch(path, changed)
  77. update.wait(10)
  78. assert data == [None]
  79. update.clear()
  80. self.client.create(path, b"fred")
  81. update.wait(10)
  82. assert data[0] == b"fred"
  83. update.clear()
  84. def test_datawatch_across_session_expire(self):
  85. update = threading.Event()
  86. data = [True]
  87. @self.client.DataWatch(self.path)
  88. def changed(d, stat):
  89. data.pop()
  90. data.append(d)
  91. update.set()
  92. update.wait(10)
  93. assert data == [b""]
  94. update.clear()
  95. self.expire_session(threading.Event)
  96. self.client.retry(self.client.set, self.path, b"fred")
  97. update.wait(25)
  98. assert data[0] == b"fred"
  99. def test_func_stops(self):
  100. update = threading.Event()
  101. data = [True]
  102. self.path += "f"
  103. fail_through = []
  104. @self.client.DataWatch(self.path)
  105. def changed(d, stat):
  106. data.pop()
  107. data.append(d)
  108. update.set()
  109. if fail_through:
  110. return False
  111. update.wait(10)
  112. assert data == [None]
  113. update.clear()
  114. fail_through.append(True)
  115. self.client.create(self.path, b"fred")
  116. update.wait(10)
  117. assert data[0] == b"fred"
  118. update.clear()
  119. self.client.set(self.path, b"asdfasdf")
  120. update.wait(0.2)
  121. assert data[0] == b"fred"
  122. d, stat = self.client.get(self.path)
  123. assert d == b"asdfasdf"
  124. def test_no_such_node(self):
  125. args = []
  126. @self.client.DataWatch("/some/path")
  127. def changed(d, stat):
  128. args.extend([d, stat])
  129. assert args == [None, None]
  130. def test_no_such_node_for_children_watch(self):
  131. args = []
  132. path = self.path + "/test_no_such_node_for_children_watch"
  133. update = threading.Event()
  134. def changed(children):
  135. args.append(children)
  136. update.set()
  137. # watch a node which does not exist
  138. children_watch = self.client.ChildrenWatch(path, changed)
  139. assert update.is_set() is False
  140. assert children_watch._stopped is True
  141. assert args == []
  142. # watch a node which exists
  143. self.client.create(path, b"")
  144. children_watch = self.client.ChildrenWatch(path, changed)
  145. update.wait(3)
  146. assert args == [[]]
  147. update.clear()
  148. # watch changes
  149. self.client.create(path + "/fred", b"")
  150. update.wait(3)
  151. assert args == [[], ["fred"]]
  152. update.clear()
  153. # delete children
  154. self.client.delete(path + "/fred")
  155. update.wait(3)
  156. assert args == [[], ["fred"], []]
  157. update.clear()
  158. # delete watching
  159. self.client.delete(path)
  160. # a hack for waiting the watcher stop
  161. for retry in range(5):
  162. if children_watch._stopped:
  163. break
  164. children_watch._run_lock.acquire()
  165. children_watch._run_lock.release()
  166. time.sleep(retry / 10.0)
  167. assert update.is_set() is False
  168. assert children_watch._stopped is True
  169. def test_watcher_evaluating_to_false(self):
  170. class WeirdWatcher(list):
  171. def __call__(self, *args):
  172. self.called = True
  173. watcher = WeirdWatcher()
  174. self.client.DataWatch(self.path, watcher)
  175. self.client.set(self.path, b"mwahaha")
  176. assert watcher.called is True
  177. def test_watcher_repeat_delete(self):
  178. a = []
  179. ev = threading.Event()
  180. self.client.delete(self.path)
  181. @self.client.DataWatch(self.path)
  182. def changed(val, stat):
  183. a.append(val)
  184. ev.set()
  185. assert a == [None]
  186. ev.wait(10)
  187. ev.clear()
  188. self.client.create(self.path, b"blah")
  189. ev.wait(10)
  190. assert ev.is_set() is True
  191. ev.clear()
  192. assert a == [None, b"blah"]
  193. self.client.delete(self.path)
  194. ev.wait(10)
  195. assert ev.is_set() is True
  196. ev.clear()
  197. assert a == [None, b"blah", None]
  198. self.client.create(self.path, b"blah")
  199. ev.wait(10)
  200. assert ev.is_set() is True
  201. ev.clear()
  202. assert a == [None, b"blah", None, b"blah"]
  203. def test_watcher_with_closing(self):
  204. a = []
  205. ev = threading.Event()
  206. self.client.delete(self.path)
  207. @self.client.DataWatch(self.path)
  208. def changed(val, stat):
  209. a.append(val)
  210. ev.set()
  211. assert a == [None]
  212. b = False
  213. try:
  214. self.client.stop()
  215. except: # noqa
  216. b = True
  217. assert b is False
  218. class KazooChildrenWatcherTests(KazooTestCase):
  219. def setUp(self):
  220. super(KazooChildrenWatcherTests, self).setUp()
  221. self.path = "/" + uuid.uuid4().hex
  222. self.client.ensure_path(self.path)
  223. def test_child_watcher(self):
  224. update = threading.Event()
  225. all_children = ["fred"]
  226. @self.client.ChildrenWatch(self.path)
  227. def changed(children):
  228. while all_children:
  229. all_children.pop()
  230. all_children.extend(children)
  231. update.set()
  232. update.wait(10)
  233. assert all_children == []
  234. update.clear()
  235. self.client.create(self.path + "/" + "smith")
  236. update.wait(10)
  237. assert all_children == ["smith"]
  238. update.clear()
  239. self.client.create(self.path + "/" + "george")
  240. update.wait(10)
  241. assert sorted(all_children) == ["george", "smith"]
  242. def test_child_watcher_once(self):
  243. update = threading.Event()
  244. all_children = ["fred"]
  245. cwatch = self.client.ChildrenWatch(self.path)
  246. @cwatch
  247. def changed(children):
  248. while all_children:
  249. all_children.pop()
  250. all_children.extend(children)
  251. update.set()
  252. update.wait(10)
  253. assert all_children == []
  254. update.clear()
  255. with pytest.raises(KazooException):
  256. @cwatch
  257. def changed_again(children):
  258. update.set()
  259. def test_child_watcher_with_event(self):
  260. update = threading.Event()
  261. events = [True]
  262. @self.client.ChildrenWatch(self.path, send_event=True)
  263. def changed(children, event):
  264. events.pop()
  265. events.append(event)
  266. update.set()
  267. update.wait(10)
  268. assert events == [None]
  269. update.clear()
  270. self.client.create(self.path + "/" + "smith")
  271. update.wait(10)
  272. assert events[0].type == EventType.CHILD
  273. update.clear()
  274. def test_func_style_child_watcher(self):
  275. update = threading.Event()
  276. all_children = ["fred"]
  277. def changed(children):
  278. while all_children:
  279. all_children.pop()
  280. all_children.extend(children)
  281. update.set()
  282. self.client.ChildrenWatch(self.path, changed)
  283. update.wait(10)
  284. assert all_children == []
  285. update.clear()
  286. self.client.create(self.path + "/" + "smith")
  287. update.wait(10)
  288. assert all_children == ["smith"]
  289. update.clear()
  290. self.client.create(self.path + "/" + "george")
  291. update.wait(10)
  292. assert sorted(all_children) == ["george", "smith"]
  293. def test_func_stops(self):
  294. update = threading.Event()
  295. all_children = ["fred"]
  296. fail_through = []
  297. @self.client.ChildrenWatch(self.path)
  298. def changed(children):
  299. while all_children:
  300. all_children.pop()
  301. all_children.extend(children)
  302. update.set()
  303. if fail_through:
  304. return False
  305. update.wait(10)
  306. assert all_children == []
  307. update.clear()
  308. fail_through.append(True)
  309. self.client.create(self.path + "/" + "smith")
  310. update.wait(10)
  311. assert all_children == ["smith"]
  312. update.clear()
  313. self.client.create(self.path + "/" + "george")
  314. update.wait(0.5)
  315. assert all_children == ["smith"]
  316. def test_child_watcher_remove_session_watcher(self):
  317. update = threading.Event()
  318. all_children = ["fred"]
  319. fail_through = []
  320. def changed(children):
  321. while all_children:
  322. all_children.pop()
  323. all_children.extend(children)
  324. update.set()
  325. if fail_through:
  326. return False
  327. children_watch = self.client.ChildrenWatch(self.path, changed)
  328. session_watcher = children_watch._session_watcher
  329. update.wait(10)
  330. assert session_watcher in self.client.state_listeners
  331. assert all_children == []
  332. update.clear()
  333. fail_through.append(True)
  334. self.client.create(self.path + "/" + "smith")
  335. update.wait(10)
  336. assert session_watcher not in self.client.state_listeners
  337. assert all_children == ["smith"]
  338. update.clear()
  339. self.client.create(self.path + "/" + "george")
  340. update.wait(10)
  341. assert session_watcher not in self.client.state_listeners
  342. assert all_children == ["smith"]
  343. def test_child_watch_session_loss(self):
  344. update = threading.Event()
  345. all_children = ["fred"]
  346. @self.client.ChildrenWatch(self.path)
  347. def changed(children):
  348. while all_children:
  349. all_children.pop()
  350. all_children.extend(children)
  351. update.set()
  352. update.wait(10)
  353. assert all_children == []
  354. update.clear()
  355. self.client.create(self.path + "/" + "smith")
  356. update.wait(10)
  357. assert all_children == ["smith"]
  358. update.clear()
  359. self.expire_session(threading.Event)
  360. self.client.retry(self.client.create, self.path + "/" + "george")
  361. update.wait(20)
  362. assert sorted(all_children) == ["george", "smith"]
  363. def test_child_stop_on_session_loss(self):
  364. update = threading.Event()
  365. all_children = ["fred"]
  366. @self.client.ChildrenWatch(self.path, allow_session_lost=False)
  367. def changed(children):
  368. while all_children:
  369. all_children.pop()
  370. all_children.extend(children)
  371. update.set()
  372. update.wait(10)
  373. assert all_children == []
  374. update.clear()
  375. self.client.create(self.path + "/" + "smith")
  376. update.wait(10)
  377. assert all_children == ["smith"]
  378. update.clear()
  379. self.expire_session(threading.Event)
  380. self.client.retry(self.client.create, self.path + "/" + "george")
  381. update.wait(4)
  382. assert update.is_set() is False
  383. assert all_children == ["smith"]
  384. children = self.client.get_children(self.path)
  385. assert sorted(children) == ["george", "smith"]
  386. class KazooPatientChildrenWatcherTests(KazooTestCase):
  387. def setUp(self):
  388. super(KazooPatientChildrenWatcherTests, self).setUp()
  389. self.path = "/" + uuid.uuid4().hex
  390. def _makeOne(self, *args, **kwargs):
  391. from kazoo.recipe.watchers import PatientChildrenWatch
  392. return PatientChildrenWatch(*args, **kwargs)
  393. def test_watch(self):
  394. self.client.ensure_path(self.path)
  395. watcher = self._makeOne(self.client, self.path, 0.1)
  396. result = watcher.start()
  397. children, asy = result.get()
  398. assert len(children) == 0
  399. assert asy.ready() is False
  400. self.client.create(self.path + "/" + "fred")
  401. asy.get(timeout=1)
  402. assert asy.ready() is True
  403. def test_exception(self):
  404. from kazoo.exceptions import NoNodeError
  405. watcher = self._makeOne(self.client, self.path, 0.1)
  406. result = watcher.start()
  407. with pytest.raises(NoNodeError):
  408. result.get()
  409. def test_watch_iterations(self):
  410. self.client.ensure_path(self.path)
  411. watcher = self._makeOne(self.client, self.path, 0.5)
  412. result = watcher.start()
  413. assert result.ready() is False
  414. time.sleep(0.08)
  415. self.client.create(self.path + "/" + uuid.uuid4().hex)
  416. assert result.ready() is False
  417. time.sleep(0.08)
  418. assert result.ready() is False
  419. self.client.create(self.path + "/" + uuid.uuid4().hex)
  420. time.sleep(0.08)
  421. assert result.ready() is False
  422. children, asy = result.get()
  423. assert len(children) == 2