图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
5.1 KiB

  1. import unittest
  2. import sys
  3. import pytest
  4. from kazoo.client import KazooClient
  5. from kazoo.exceptions import NoNodeError
  6. from kazoo.protocol.states import Callback
  7. from kazoo.testing import KazooTestCase
  8. from kazoo.tests import test_client
  9. @pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
  10. class TestGeventHandler(unittest.TestCase):
  11. def setUp(self):
  12. try:
  13. import gevent # NOQA
  14. except ImportError:
  15. pytest.skip("gevent not available.")
  16. def _makeOne(self, *args):
  17. from kazoo.handlers.gevent import SequentialGeventHandler
  18. return SequentialGeventHandler(*args)
  19. def _getAsync(self, *args):
  20. from kazoo.handlers.gevent import AsyncResult
  21. return AsyncResult
  22. def _getEvent(self):
  23. from gevent.event import Event
  24. return Event
  25. def test_proper_threading(self):
  26. h = self._makeOne()
  27. h.start()
  28. assert isinstance(h.event_object(), self._getEvent())
  29. def test_matching_async(self):
  30. h = self._makeOne()
  31. h.start()
  32. async_handler = self._getAsync()
  33. assert isinstance(h.async_result(), async_handler)
  34. def test_exception_raising(self):
  35. h = self._makeOne()
  36. with pytest.raises(h.timeout_exception):
  37. raise h.timeout_exception("This is a timeout")
  38. def test_exception_in_queue(self):
  39. h = self._makeOne()
  40. h.start()
  41. ev = self._getEvent()()
  42. def func():
  43. ev.set()
  44. raise ValueError("bang")
  45. call1 = Callback("completion", func, ())
  46. h.dispatch_callback(call1)
  47. ev.wait()
  48. def test_queue_empty_exception(self):
  49. from gevent.queue import Empty
  50. h = self._makeOne()
  51. h.start()
  52. ev = self._getEvent()()
  53. def func():
  54. ev.set()
  55. raise Empty()
  56. call1 = Callback("completion", func, ())
  57. h.dispatch_callback(call1)
  58. ev.wait()
  59. @pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
  60. class TestBasicGeventClient(KazooTestCase):
  61. def setUp(self):
  62. try:
  63. import gevent # NOQA
  64. except ImportError:
  65. pytest.skip("gevent not available.")
  66. KazooTestCase.setUp(self)
  67. def _makeOne(self, *args):
  68. from kazoo.handlers.gevent import SequentialGeventHandler
  69. return SequentialGeventHandler(*args)
  70. def _getEvent(self):
  71. from gevent.event import Event
  72. return Event
  73. def test_start(self):
  74. client = self._get_client(handler=self._makeOne())
  75. client.start()
  76. assert client.state == "CONNECTED"
  77. client.stop()
  78. def test_start_stop_double(self):
  79. client = self._get_client(handler=self._makeOne())
  80. client.start()
  81. assert client.state == "CONNECTED"
  82. client.handler.start()
  83. client.handler.stop()
  84. client.stop()
  85. def test_basic_commands(self):
  86. client = self._get_client(handler=self._makeOne())
  87. client.start()
  88. assert client.state == "CONNECTED"
  89. client.create("/anode", b"fred")
  90. assert client.get("/anode")[0] == b"fred"
  91. assert client.delete("/anode")
  92. assert client.exists("/anode") is None
  93. client.stop()
  94. def test_failures(self):
  95. client = self._get_client(handler=self._makeOne())
  96. client.start()
  97. with pytest.raises(NoNodeError):
  98. client.get("/none")
  99. client.stop()
  100. def test_data_watcher(self):
  101. client = self._get_client(handler=self._makeOne())
  102. client.start()
  103. client.ensure_path("/some/node")
  104. ev = self._getEvent()()
  105. @client.DataWatch("/some/node")
  106. def changed(d, stat):
  107. ev.set()
  108. ev.wait()
  109. ev.clear()
  110. client.set("/some/node", b"newvalue")
  111. ev.wait()
  112. client.stop()
  113. def test_huge_file_descriptor(self):
  114. import resource
  115. from gevent import socket
  116. from kazoo.handlers.utils import create_tcp_socket
  117. try:
  118. resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
  119. except (ValueError, resource.error):
  120. self.skipTest("couldnt raise fd limit high enough")
  121. fd = 0
  122. socks = []
  123. while fd < 4000:
  124. sock = create_tcp_socket(socket)
  125. fd = sock.fileno()
  126. socks.append(sock)
  127. h = self._makeOne()
  128. h.start()
  129. h.select(socks, [], [], 0)
  130. h.stop()
  131. for sock in socks:
  132. sock.close()
  133. @pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
  134. class TestGeventClient(test_client.TestClient):
  135. def setUp(self):
  136. try:
  137. import gevent # NOQA
  138. except ImportError:
  139. pytest.skip("gevent not available.")
  140. KazooTestCase.setUp(self)
  141. def _makeOne(self, *args):
  142. from kazoo.handlers.gevent import SequentialGeventHandler
  143. return SequentialGeventHandler(*args)
  144. def _get_client(self, **kwargs):
  145. kwargs["handler"] = self._makeOne()
  146. return KazooClient(self.hosts, **kwargs)