图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
6.1 KiB

  1. import uuid
  2. import pytest
  3. from kazoo.testing import KazooTestCase
  4. from kazoo.tests.util import CI_ZK_VERSION
  5. class KazooQueueTests(KazooTestCase):
  6. def _makeOne(self):
  7. path = "/" + uuid.uuid4().hex
  8. return self.client.Queue(path)
  9. def test_queue_validation(self):
  10. queue = self._makeOne()
  11. with pytest.raises(TypeError):
  12. queue.put({})
  13. with pytest.raises(TypeError):
  14. queue.put(b"one", b"100")
  15. with pytest.raises(TypeError):
  16. queue.put(b"one", 10.0)
  17. with pytest.raises(ValueError):
  18. queue.put(b"one", -100)
  19. with pytest.raises(ValueError):
  20. queue.put(b"one", 100000)
  21. def test_empty_queue(self):
  22. queue = self._makeOne()
  23. assert len(queue) == 0
  24. assert queue.get() is None
  25. assert len(queue) == 0
  26. def test_queue(self):
  27. queue = self._makeOne()
  28. queue.put(b"one")
  29. queue.put(b"two")
  30. queue.put(b"three")
  31. assert len(queue) == 3
  32. assert queue.get() == b"one"
  33. assert queue.get() == b"two"
  34. assert queue.get() == b"three"
  35. assert len(queue) == 0
  36. def test_priority(self):
  37. queue = self._makeOne()
  38. queue.put(b"four", priority=101)
  39. queue.put(b"one", priority=0)
  40. queue.put(b"two", priority=0)
  41. queue.put(b"three", priority=10)
  42. assert queue.get() == b"one"
  43. assert queue.get() == b"two"
  44. assert queue.get() == b"three"
  45. assert queue.get() == b"four"
  46. class KazooLockingQueueTests(KazooTestCase):
  47. def setUp(self):
  48. KazooTestCase.setUp(self)
  49. skip = False
  50. if CI_ZK_VERSION and CI_ZK_VERSION < (3, 4):
  51. skip = True
  52. elif CI_ZK_VERSION and CI_ZK_VERSION >= (3, 4):
  53. skip = False
  54. else:
  55. ver = self.client.server_version()
  56. if ver[1] < 4:
  57. skip = True
  58. if skip:
  59. pytest.skip("Must use Zookeeper 3.4 or above")
  60. def _makeOne(self):
  61. path = "/" + uuid.uuid4().hex
  62. return self.client.LockingQueue(path)
  63. def test_queue_validation(self):
  64. queue = self._makeOne()
  65. with pytest.raises(TypeError):
  66. queue.put({})
  67. with pytest.raises(TypeError):
  68. queue.put(b"one", b"100")
  69. with pytest.raises(TypeError):
  70. queue.put(b"one", 10.0)
  71. with pytest.raises(ValueError):
  72. queue.put(b"one", -100)
  73. with pytest.raises(ValueError):
  74. queue.put(b"one", 100000)
  75. with pytest.raises(TypeError):
  76. queue.put_all({})
  77. with pytest.raises(TypeError):
  78. queue.put_all([{}])
  79. with pytest.raises(TypeError):
  80. queue.put_all([b"one"], b"100")
  81. with pytest.raises(TypeError):
  82. queue.put_all([b"one"], 10.0)
  83. with pytest.raises(ValueError):
  84. queue.put_all([b"one"], -100)
  85. with pytest.raises(ValueError):
  86. queue.put_all([b"one"], 100000)
  87. def test_empty_queue(self):
  88. queue = self._makeOne()
  89. assert len(queue) == 0
  90. assert queue.get(0) is None
  91. assert len(queue) == 0
  92. def test_queue(self):
  93. queue = self._makeOne()
  94. queue.put(b"one")
  95. queue.put_all([b"two", b"three"])
  96. assert len(queue) == 3
  97. assert not queue.consume()
  98. assert not queue.holds_lock()
  99. assert queue.get(1) == b"one"
  100. assert queue.holds_lock()
  101. # Without consuming, should return the same element
  102. assert queue.get(1) == b"one"
  103. assert queue.consume()
  104. assert not queue.holds_lock()
  105. assert queue.get(1) == b"two"
  106. assert queue.holds_lock()
  107. assert queue.consume()
  108. assert not queue.holds_lock()
  109. assert queue.get(1) == b"three"
  110. assert queue.holds_lock()
  111. assert queue.consume()
  112. assert not queue.holds_lock()
  113. assert not queue.consume()
  114. assert len(queue) == 0
  115. def test_consume(self):
  116. queue = self._makeOne()
  117. queue.put(b"one")
  118. assert not queue.consume()
  119. queue.get(0.1)
  120. assert queue.consume()
  121. assert not queue.consume()
  122. def test_release(self):
  123. queue = self._makeOne()
  124. queue.put(b"one")
  125. assert queue.get(1) == b"one"
  126. assert queue.holds_lock()
  127. assert queue.release()
  128. assert not queue.holds_lock()
  129. assert queue.get(1) == b"one"
  130. assert queue.consume()
  131. assert not queue.release()
  132. assert len(queue) == 0
  133. def test_holds_lock(self):
  134. queue = self._makeOne()
  135. assert not queue.holds_lock()
  136. queue.put(b"one")
  137. queue.get(0.1)
  138. assert queue.holds_lock()
  139. queue.consume()
  140. assert not queue.holds_lock()
  141. def test_priority(self):
  142. queue = self._makeOne()
  143. queue.put(b"four", priority=101)
  144. queue.put(b"one", priority=0)
  145. queue.put(b"two", priority=0)
  146. queue.put(b"three", priority=10)
  147. assert queue.get(1) == b"one"
  148. assert queue.consume()
  149. assert queue.get(1) == b"two"
  150. assert queue.consume()
  151. assert queue.get(1) == b"three"
  152. assert queue.consume()
  153. assert queue.get(1) == b"four"
  154. assert queue.consume()
  155. def test_concurrent_execution(self):
  156. queue = self._makeOne()
  157. value1 = []
  158. value2 = []
  159. value3 = []
  160. event1 = self.client.handler.event_object()
  161. event2 = self.client.handler.event_object()
  162. event3 = self.client.handler.event_object()
  163. def get_concurrently(value, event):
  164. q = self.client.LockingQueue(queue.path)
  165. value.append(q.get(0.1))
  166. event.set()
  167. self.client.handler.spawn(get_concurrently, value1, event1)
  168. self.client.handler.spawn(get_concurrently, value2, event2)
  169. self.client.handler.spawn(get_concurrently, value3, event3)
  170. queue.put(b"one")
  171. event1.wait(0.2)
  172. event2.wait(0.2)
  173. event3.wait(0.2)
  174. result = value1 + value2 + value3
  175. assert result.count(b"one") == 1
  176. assert result.count(None) == 2