图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.0 KiB

  1. import threading
  2. from kazoo.testing import KazooTestCase
  3. class KazooBarrierTests(KazooTestCase):
  4. def test_barrier_not_exist(self):
  5. b = self.client.Barrier("/some/path")
  6. assert b.wait()
  7. def test_barrier_exists(self):
  8. b = self.client.Barrier("/some/path")
  9. b.create()
  10. assert not b.wait(0)
  11. b.remove()
  12. assert b.wait()
  13. def test_remove_nonexistent_barrier(self):
  14. b = self.client.Barrier("/some/path")
  15. assert not b.remove()
  16. class KazooDoubleBarrierTests(KazooTestCase):
  17. def test_basic_barrier(self):
  18. b = self.client.DoubleBarrier("/some/path", 1)
  19. assert not b.participating
  20. b.enter()
  21. assert b.participating
  22. b.leave()
  23. assert not b.participating
  24. def test_two_barrier(self):
  25. av = threading.Event()
  26. ev = threading.Event()
  27. bv = threading.Event()
  28. release_all = threading.Event()
  29. b1 = self.client.DoubleBarrier("/some/path", 2)
  30. b2 = self.client.DoubleBarrier("/some/path", 2)
  31. def make_barrier_one():
  32. b1.enter()
  33. ev.set()
  34. release_all.wait()
  35. b1.leave()
  36. ev.set()
  37. def make_barrier_two():
  38. bv.wait()
  39. b2.enter()
  40. av.set()
  41. release_all.wait()
  42. b2.leave()
  43. av.set()
  44. # Spin up both of them
  45. t1 = threading.Thread(target=make_barrier_one)
  46. t1.start()
  47. t2 = threading.Thread(target=make_barrier_two)
  48. t2.start()
  49. assert not b1.participating
  50. assert not b2.participating
  51. bv.set()
  52. av.wait()
  53. ev.wait()
  54. assert b1.participating
  55. assert b2.participating
  56. av.clear()
  57. ev.clear()
  58. release_all.set()
  59. av.wait()
  60. ev.wait()
  61. assert not b1.participating
  62. assert not b2.participating
  63. t1.join()
  64. t2.join()
  65. def test_three_barrier(self):
  66. av = threading.Event()
  67. ev = threading.Event()
  68. bv = threading.Event()
  69. release_all = threading.Event()
  70. b1 = self.client.DoubleBarrier("/some/path", 3)
  71. b2 = self.client.DoubleBarrier("/some/path", 3)
  72. b3 = self.client.DoubleBarrier("/some/path", 3)
  73. def make_barrier_one():
  74. b1.enter()
  75. ev.set()
  76. release_all.wait()
  77. b1.leave()
  78. ev.set()
  79. def make_barrier_two():
  80. bv.wait()
  81. b2.enter()
  82. av.set()
  83. release_all.wait()
  84. b2.leave()
  85. av.set()
  86. # Spin up both of them
  87. t1 = threading.Thread(target=make_barrier_one)
  88. t1.start()
  89. t2 = threading.Thread(target=make_barrier_two)
  90. t2.start()
  91. assert not b1.participating
  92. assert not b2.participating
  93. bv.set()
  94. assert not b1.participating
  95. assert not b2.participating
  96. b3.enter()
  97. ev.wait()
  98. av.wait()
  99. assert b1.participating
  100. assert b2.participating
  101. assert b3.participating
  102. av.clear()
  103. ev.clear()
  104. release_all.set()
  105. b3.leave()
  106. av.wait()
  107. ev.wait()
  108. assert not b1.participating
  109. assert not b2.participating
  110. assert not b3.participating
  111. t1.join()
  112. t2.join()
  113. def test_barrier_existing_parent_node(self):
  114. b = self.client.DoubleBarrier("/some/path", 1)
  115. assert b.participating is False
  116. self.client.create("/some", ephemeral=True)
  117. # the barrier cannot create children under an ephemeral node
  118. b.enter()
  119. assert b.participating is False
  120. def test_barrier_existing_node(self):
  121. b = self.client.DoubleBarrier("/some", 1)
  122. assert b.participating is False
  123. self.client.ensure_path(b.path)
  124. self.client.create(b.create_path, ephemeral=True)
  125. # the barrier will re-use an existing node
  126. b.enter()
  127. assert b.participating is True
  128. b.leave()