图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
10 KiB

  1. import threading
  2. import unittest
  3. from unittest.mock import Mock
  4. import pytest
  5. class TestThreadingHandler(unittest.TestCase):
  6. def _makeOne(self, *args):
  7. from kazoo.handlers.threading import SequentialThreadingHandler
  8. return SequentialThreadingHandler(*args)
  9. def _getAsync(self, *args):
  10. from kazoo.handlers.threading import AsyncResult
  11. return AsyncResult
  12. def test_proper_threading(self):
  13. h = self._makeOne()
  14. h.start()
  15. # In Python 3.3 _Event is gone, before Event is function
  16. event_class = getattr(threading, "_Event", threading.Event)
  17. assert isinstance(h.event_object(), event_class)
  18. def test_matching_async(self):
  19. h = self._makeOne()
  20. h.start()
  21. async_result = self._getAsync()
  22. assert isinstance(h.async_result(), async_result)
  23. def test_exception_raising(self):
  24. h = self._makeOne()
  25. with pytest.raises(h.timeout_exception):
  26. raise h.timeout_exception("This is a timeout")
  27. def test_double_start_stop(self):
  28. h = self._makeOne()
  29. h.start()
  30. assert h._running is True
  31. h.start()
  32. h.stop()
  33. h.stop()
  34. assert h._running is False
  35. def test_huge_file_descriptor(self):
  36. try:
  37. import resource
  38. except ImportError:
  39. self.skipTest("resource module unavailable on this platform")
  40. import socket
  41. from kazoo.handlers.utils import create_tcp_socket
  42. try:
  43. resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
  44. except (ValueError, resource.error):
  45. self.skipTest("couldnt raise fd limit high enough")
  46. fd = 0
  47. socks = []
  48. while fd < 4000:
  49. sock = create_tcp_socket(socket)
  50. fd = sock.fileno()
  51. socks.append(sock)
  52. h = self._makeOne()
  53. h.start()
  54. h.select(socks, [], [], 0)
  55. h.stop()
  56. for sock in socks:
  57. sock.close()
  58. class TestThreadingAsync(unittest.TestCase):
  59. def _makeOne(self, *args):
  60. from kazoo.handlers.threading import AsyncResult
  61. return AsyncResult(*args)
  62. def _makeHandler(self):
  63. from kazoo.handlers.threading import SequentialThreadingHandler
  64. return SequentialThreadingHandler()
  65. def test_ready(self):
  66. mock_handler = Mock()
  67. async_result = self._makeOne(mock_handler)
  68. assert async_result.ready() is False
  69. async_result.set("val")
  70. assert async_result.ready() is True
  71. assert async_result.successful() is True
  72. assert async_result.exception is None
  73. def test_callback_queued(self):
  74. mock_handler = Mock()
  75. mock_handler.completion_queue = Mock()
  76. async_result = self._makeOne(mock_handler)
  77. async_result.rawlink(lambda a: a)
  78. async_result.set("val")
  79. assert mock_handler.completion_queue.put.called
  80. def test_set_exception(self):
  81. mock_handler = Mock()
  82. mock_handler.completion_queue = Mock()
  83. async_result = self._makeOne(mock_handler)
  84. async_result.rawlink(lambda a: a)
  85. async_result.set_exception(ImportError("Error occured"))
  86. assert isinstance(async_result.exception, ImportError)
  87. assert mock_handler.completion_queue.put.called
  88. def test_get_wait_while_setting(self):
  89. mock_handler = Mock()
  90. async_result = self._makeOne(mock_handler)
  91. lst = []
  92. bv = threading.Event()
  93. cv = threading.Event()
  94. def wait_for_val():
  95. bv.set()
  96. val = async_result.get()
  97. lst.append(val)
  98. cv.set()
  99. th = threading.Thread(target=wait_for_val)
  100. th.start()
  101. bv.wait()
  102. async_result.set("fred")
  103. cv.wait()
  104. assert lst == ["fred"]
  105. th.join()
  106. def test_get_with_nowait(self):
  107. mock_handler = Mock()
  108. async_result = self._makeOne(mock_handler)
  109. timeout = self._makeHandler().timeout_exception
  110. with pytest.raises(timeout):
  111. async_result.get(block=False)
  112. with pytest.raises(timeout):
  113. async_result.get_nowait()
  114. def test_get_with_exception(self):
  115. mock_handler = Mock()
  116. async_result = self._makeOne(mock_handler)
  117. lst = []
  118. bv = threading.Event()
  119. cv = threading.Event()
  120. def wait_for_val():
  121. bv.set()
  122. try:
  123. val = async_result.get()
  124. except ImportError:
  125. lst.append("oops")
  126. else:
  127. lst.append(val)
  128. cv.set()
  129. th = threading.Thread(target=wait_for_val)
  130. th.start()
  131. bv.wait()
  132. async_result.set_exception(ImportError)
  133. cv.wait()
  134. assert lst == ["oops"]
  135. th.join()
  136. def test_wait(self):
  137. mock_handler = Mock()
  138. async_result = self._makeOne(mock_handler)
  139. lst = []
  140. bv = threading.Event()
  141. cv = threading.Event()
  142. def wait_for_val():
  143. bv.set()
  144. try:
  145. val = async_result.wait(10)
  146. except ImportError:
  147. lst.append("oops")
  148. else:
  149. lst.append(val)
  150. cv.set()
  151. th = threading.Thread(target=wait_for_val)
  152. th.start()
  153. bv.wait(10)
  154. async_result.set("fred")
  155. cv.wait(15)
  156. assert lst == [True]
  157. th.join()
  158. def test_wait_race(self):
  159. """Test that there is no race condition in `IAsyncResult.wait()`.
  160. Guards against the reappearance of:
  161. https://github.com/python-zk/kazoo/issues/485
  162. """
  163. mock_handler = Mock()
  164. async_result = self._makeOne(mock_handler)
  165. async_result.set("immediate")
  166. cv = threading.Event()
  167. def wait_for_val():
  168. # NB: should not sleep
  169. async_result.wait(20)
  170. cv.set()
  171. th = threading.Thread(target=wait_for_val)
  172. th.daemon = True
  173. th.start()
  174. # if the wait() didn't sleep (correctly), cv will be set quickly
  175. # if it did sleep, the cv will not be set yet and this will timeout
  176. cv.wait(10)
  177. assert cv.is_set() is True
  178. th.join()
  179. def test_set_before_wait(self):
  180. mock_handler = Mock()
  181. async_result = self._makeOne(mock_handler)
  182. lst = []
  183. cv = threading.Event()
  184. async_result.set("fred")
  185. def wait_for_val():
  186. val = async_result.get()
  187. lst.append(val)
  188. cv.set()
  189. th = threading.Thread(target=wait_for_val)
  190. th.start()
  191. cv.wait()
  192. assert lst == ["fred"]
  193. th.join()
  194. def test_set_exc_before_wait(self):
  195. mock_handler = Mock()
  196. async_result = self._makeOne(mock_handler)
  197. lst = []
  198. cv = threading.Event()
  199. async_result.set_exception(ImportError)
  200. def wait_for_val():
  201. try:
  202. val = async_result.get()
  203. except ImportError:
  204. lst.append("ooops")
  205. else:
  206. lst.append(val)
  207. cv.set()
  208. th = threading.Thread(target=wait_for_val)
  209. th.start()
  210. cv.wait()
  211. assert lst == ["ooops"]
  212. th.join()
  213. def test_linkage(self):
  214. mock_handler = Mock()
  215. async_result = self._makeOne(mock_handler)
  216. cv = threading.Event()
  217. lst = []
  218. def add_on():
  219. lst.append(True)
  220. def wait_for_val():
  221. async_result.get()
  222. cv.set()
  223. th = threading.Thread(target=wait_for_val)
  224. th.start()
  225. async_result.rawlink(add_on)
  226. async_result.set(b"fred")
  227. assert mock_handler.completion_queue.put.called
  228. async_result.unlink(add_on)
  229. cv.wait()
  230. assert async_result.value == b"fred"
  231. th.join()
  232. def test_linkage_not_ready(self):
  233. mock_handler = Mock()
  234. async_result = self._makeOne(mock_handler)
  235. lst = []
  236. def add_on():
  237. lst.append(True)
  238. async_result.set("fred")
  239. assert not mock_handler.completion_queue.called
  240. async_result.rawlink(add_on)
  241. assert mock_handler.completion_queue.put.called
  242. def test_link_and_unlink(self):
  243. mock_handler = Mock()
  244. async_result = self._makeOne(mock_handler)
  245. lst = []
  246. def add_on():
  247. lst.append(True)
  248. async_result.rawlink(add_on)
  249. assert not mock_handler.completion_queue.put.called
  250. async_result.unlink(add_on)
  251. async_result.set("fred")
  252. assert not mock_handler.completion_queue.put.called
  253. def test_captured_exception(self):
  254. from kazoo.handlers.utils import capture_exceptions
  255. mock_handler = Mock()
  256. async_result = self._makeOne(mock_handler)
  257. @capture_exceptions(async_result)
  258. def exceptional_function():
  259. return 1 / 0
  260. exceptional_function()
  261. with pytest.raises(ZeroDivisionError):
  262. async_result.get()
  263. def test_no_capture_exceptions(self):
  264. from kazoo.handlers.utils import capture_exceptions
  265. mock_handler = Mock()
  266. async_result = self._makeOne(mock_handler)
  267. lst = []
  268. def add_on():
  269. lst.append(True)
  270. async_result.rawlink(add_on)
  271. @capture_exceptions(async_result)
  272. def regular_function():
  273. return True
  274. regular_function()
  275. assert not mock_handler.completion_queue.put.called
  276. def test_wraps(self):
  277. from kazoo.handlers.utils import wrap
  278. mock_handler = Mock()
  279. async_result = self._makeOne(mock_handler)
  280. lst = []
  281. def add_on(result):
  282. lst.append(result.get())
  283. async_result.rawlink(add_on)
  284. @wrap(async_result)
  285. def regular_function():
  286. return "hello"
  287. assert regular_function() == "hello"
  288. assert mock_handler.completion_queue.put.called
  289. assert async_result.get() == "hello"
  290. def test_multiple_callbacks(self):
  291. mockback1 = Mock(name="mockback1")
  292. mockback2 = Mock(name="mockback2")
  293. handler = self._makeHandler()
  294. handler.start()
  295. async_result = self._makeOne(handler)
  296. async_result.rawlink(mockback1)
  297. async_result.rawlink(mockback2)
  298. async_result.set("howdy")
  299. async_result.wait()
  300. handler.stop()
  301. mockback2.assert_called_once_with(async_result)
  302. mockback1.assert_called_once_with(async_result)