图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
12 KiB

  1. from collections import namedtuple, deque
  2. import os
  3. import threading
  4. import time
  5. import uuid
  6. from unittest.mock import patch
  7. import struct
  8. import sys
  9. import pytest
  10. from kazoo.exceptions import ConnectionLoss
  11. from kazoo.protocol.serialization import (
  12. Connect,
  13. int_struct,
  14. write_string,
  15. )
  16. from kazoo.protocol.states import KazooState
  17. from kazoo.protocol.connection import _CONNECTION_DROP
  18. from kazoo.testing import KazooTestCase
  19. from kazoo.tests.util import wait, CI_ZK_VERSION, CI
  20. class Delete(namedtuple("Delete", "path version")):
  21. type = 2
  22. def serialize(self):
  23. b = bytearray()
  24. b.extend(write_string(self.path))
  25. b.extend(int_struct.pack(self.version))
  26. return b
  27. @classmethod
  28. def deserialize(self, bytes, offset):
  29. raise ValueError("oh my")
  30. class TestConnectionHandler(KazooTestCase):
  31. def test_bad_deserialization(self):
  32. async_object = self.client.handler.async_result()
  33. self.client._queue.append(
  34. (Delete(self.client.chroot, -1), async_object)
  35. )
  36. self.client._connection._write_sock.send(b"\0")
  37. with pytest.raises(ValueError):
  38. async_object.get()
  39. def test_with_bad_sessionid(self):
  40. ev = threading.Event()
  41. def expired(state):
  42. if state == KazooState.CONNECTED:
  43. ev.set()
  44. password = os.urandom(16)
  45. client = self._get_client(client_id=(82838284824, password))
  46. client.add_listener(expired)
  47. client.start()
  48. try:
  49. ev.wait(15)
  50. assert ev.is_set()
  51. finally:
  52. client.stop()
  53. def test_connection_read_timeout(self):
  54. client = self.client
  55. ev = threading.Event()
  56. path = "/" + uuid.uuid4().hex
  57. handler = client.handler
  58. _select = handler.select
  59. _socket = client._connection._socket
  60. def delayed_select(*args, **kwargs):
  61. result = _select(*args, **kwargs)
  62. if len(args[0]) == 1 and _socket in args[0]:
  63. # for any socket read, simulate a timeout
  64. return [], [], []
  65. return result
  66. def back(state):
  67. if state == KazooState.CONNECTED:
  68. ev.set()
  69. client.add_listener(back)
  70. client.create(path, b"1")
  71. try:
  72. handler.select = delayed_select
  73. with pytest.raises(ConnectionLoss):
  74. client.get(path)
  75. finally:
  76. handler.select = _select
  77. # the client reconnects automatically
  78. ev.wait(5)
  79. assert ev.is_set()
  80. assert client.get(path)[0] == b"1"
  81. def test_connection_write_timeout(self):
  82. client = self.client
  83. ev = threading.Event()
  84. path = "/" + uuid.uuid4().hex
  85. handler = client.handler
  86. _select = handler.select
  87. _socket = client._connection._socket
  88. def delayed_select(*args, **kwargs):
  89. result = _select(*args, **kwargs)
  90. if _socket in args[1]:
  91. # for any socket write, simulate a timeout
  92. return [], [], []
  93. return result
  94. def back(state):
  95. if state == KazooState.CONNECTED:
  96. ev.set()
  97. client.add_listener(back)
  98. try:
  99. handler.select = delayed_select
  100. with pytest.raises(ConnectionLoss):
  101. client.create(path)
  102. finally:
  103. handler.select = _select
  104. # the client reconnects automatically
  105. ev.wait(5)
  106. assert ev.is_set()
  107. assert client.exists(path) is None
  108. def test_connection_deserialize_fail(self):
  109. client = self.client
  110. ev = threading.Event()
  111. path = "/" + uuid.uuid4().hex
  112. handler = client.handler
  113. _select = handler.select
  114. _socket = client._connection._socket
  115. def delayed_select(*args, **kwargs):
  116. result = _select(*args, **kwargs)
  117. if _socket in args[1]:
  118. # for any socket write, simulate a timeout
  119. return [], [], []
  120. return result
  121. def back(state):
  122. if state == KazooState.CONNECTED:
  123. ev.set()
  124. client.add_listener(back)
  125. deserialize_ev = threading.Event()
  126. def bad_deserialize(_bytes, offset):
  127. deserialize_ev.set()
  128. raise struct.error()
  129. # force the connection to die but, on reconnect, cause the
  130. # server response to be non-deserializable. ensure that the client
  131. # continues to retry. This partially reproduces a rare bug seen
  132. # in production.
  133. with patch.object(Connect, "deserialize") as mock_deserialize:
  134. mock_deserialize.side_effect = bad_deserialize
  135. try:
  136. handler.select = delayed_select
  137. with pytest.raises(ConnectionLoss):
  138. client.create(path)
  139. finally:
  140. handler.select = _select
  141. # the client reconnects automatically but the first attempt will
  142. # hit a deserialize failure. wait for that.
  143. deserialize_ev.wait(5)
  144. assert deserialize_ev.is_set()
  145. # this time should succeed
  146. ev.wait(5)
  147. assert ev.is_set()
  148. assert client.exists(path) is None
  149. def test_connection_close(self):
  150. with pytest.raises(Exception):
  151. self.client.close()
  152. self.client.stop()
  153. self.client.close()
  154. # should be able to restart
  155. self.client.start()
  156. def test_connection_sock(self):
  157. client = self.client
  158. read_sock = client._connection._read_sock
  159. write_sock = client._connection._write_sock
  160. assert read_sock is not None
  161. assert write_sock is not None
  162. # stop client and socket should not yet be closed
  163. client.stop()
  164. assert read_sock is not None
  165. assert write_sock is not None
  166. read_sock.getsockname()
  167. write_sock.getsockname()
  168. # close client, and sockets should be closed
  169. client.close()
  170. # Todo check socket closing
  171. # start client back up. should get a new, valid socket
  172. client.start()
  173. read_sock = client._connection._read_sock
  174. write_sock = client._connection._write_sock
  175. assert read_sock is not None
  176. assert write_sock is not None
  177. read_sock.getsockname()
  178. write_sock.getsockname()
  179. def test_dirty_sock(self):
  180. client = self.client
  181. read_sock = client._connection._read_sock
  182. write_sock = client._connection._write_sock
  183. # add a stray byte to the socket and ensure that doesn't
  184. # blow up client. simulates case where some error leaves
  185. # a byte in the socket which doesn't correspond to the
  186. # request queue.
  187. write_sock.send(b"\0")
  188. # eventually this byte should disappear from socket
  189. wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == [])
  190. class TestConnectionDrop(KazooTestCase):
  191. def test_connection_dropped(self):
  192. ev = threading.Event()
  193. def back(state):
  194. if state == KazooState.CONNECTED:
  195. ev.set()
  196. # create a node with a large value and stop the ZK node
  197. path = "/" + uuid.uuid4().hex
  198. self.client.create(path)
  199. self.client.add_listener(back)
  200. result = self.client.set_async(path, b"a" * 1000 * 1024)
  201. self.client._call(_CONNECTION_DROP, None)
  202. with pytest.raises(ConnectionLoss):
  203. result.get()
  204. # we have a working connection to a new node
  205. ev.wait(30)
  206. assert ev.is_set()
  207. class TestReadOnlyMode(KazooTestCase):
  208. def setUp(self):
  209. os.environ["ZOOKEEPER_LOCAL_SESSION_RO"] = "true"
  210. self.setup_zookeeper()
  211. skip = False
  212. if CI_ZK_VERSION and CI_ZK_VERSION < (3, 4):
  213. skip = True
  214. elif CI_ZK_VERSION and CI_ZK_VERSION >= (3, 4):
  215. skip = False
  216. else:
  217. ver = self.client.server_version()
  218. if ver[1] < 4:
  219. skip = True
  220. if skip:
  221. pytest.skip("Must use Zookeeper 3.4 or above")
  222. def tearDown(self):
  223. self.client.stop()
  224. os.environ.pop("ZOOKEEPER_LOCAL_SESSION_RO", None)
  225. def test_read_only(self):
  226. from kazoo.exceptions import NotReadOnlyCallError
  227. from kazoo.protocol.states import KeeperState
  228. if CI:
  229. # force some wait to make sure the data produced during the
  230. # `setUp()` step are replicaed to all zk members
  231. # if not done the `get_children()` test may fail because the
  232. # node does not exist on the node that we will keep alive
  233. time.sleep(15)
  234. # do not keep the client started in the `setUp` step alive
  235. self.client.stop()
  236. client = self._get_client(connection_retry=None, read_only=True)
  237. states = []
  238. ev = threading.Event()
  239. @client.add_listener
  240. def listen(state):
  241. states.append(state)
  242. if client.client_state == KeeperState.CONNECTED_RO:
  243. ev.set()
  244. client.start()
  245. try:
  246. # stopping both nodes at the same time
  247. # else the test seems flaky when on CI hosts
  248. zk_stop_threads = []
  249. zk_stop_threads.append(
  250. threading.Thread(target=self.cluster[1].stop, daemon=True)
  251. )
  252. zk_stop_threads.append(
  253. threading.Thread(target=self.cluster[2].stop, daemon=True)
  254. )
  255. for thread in zk_stop_threads:
  256. thread.start()
  257. for thread in zk_stop_threads:
  258. thread.join()
  259. # stopping the client is *mandatory*, else the client might try to
  260. # reconnect using a xid that the server may endlessly refuse
  261. # restarting the client makes sure the xid gets reset
  262. client.stop()
  263. client.start()
  264. ev.wait(15)
  265. assert ev.is_set()
  266. assert client.client_state == KeeperState.CONNECTED_RO
  267. # Test read only command
  268. assert client.get_children("/") == []
  269. # Test error with write command
  270. with pytest.raises(NotReadOnlyCallError):
  271. client.create("/fred")
  272. # Wait for a ping
  273. time.sleep(15)
  274. finally:
  275. client.remove_listener(listen)
  276. self.cluster[1].run()
  277. self.cluster[2].run()
  278. class TestUnorderedXids(KazooTestCase):
  279. def setUp(self):
  280. super(TestUnorderedXids, self).setUp()
  281. self.connection = self.client._connection
  282. self.connection_routine = self.connection._connection_routine
  283. self._pending = self.client._pending
  284. self.client._pending = _naughty_deque()
  285. def tearDown(self):
  286. self.client._pending = self._pending
  287. super(TestUnorderedXids, self).tearDown()
  288. def _get_client(self, **kwargs):
  289. # overrides for patching zk_loop
  290. c = KazooTestCase._get_client(self, **kwargs)
  291. self._zk_loop = c._connection.zk_loop
  292. self._zk_loop_errors = []
  293. c._connection.zk_loop = self._zk_loop_func
  294. return c
  295. def _zk_loop_func(self, *args, **kwargs):
  296. # patched zk_loop which will catch and collect all RuntimeError
  297. try:
  298. self._zk_loop(*args, **kwargs)
  299. except RuntimeError as e:
  300. self._zk_loop_errors.append(e)
  301. def test_xids_mismatch(self):
  302. from kazoo.protocol.states import KeeperState
  303. ev = threading.Event()
  304. error_stack = []
  305. @self.client.add_listener
  306. def listen(state):
  307. if self.client.client_state == KeeperState.CLOSED:
  308. ev.set()
  309. def log_exception(*args):
  310. error_stack.append((args, sys.exc_info()))
  311. self.connection.logger.exception = log_exception
  312. ev.clear()
  313. with pytest.raises(RuntimeError):
  314. self.client.get_children("/")
  315. ev.wait()
  316. assert self.client.connected is False
  317. assert self.client.state == "LOST"
  318. assert self.client.client_state == KeeperState.CLOSED
  319. args, exc_info = error_stack[-1]
  320. assert args == ("Unhandled exception in connection loop",)
  321. assert exc_info[0] == RuntimeError
  322. self.client.handler.sleep_func(0.2)
  323. assert not self.connection_routine.is_alive()
  324. assert len(self._zk_loop_errors) == 1
  325. assert self._zk_loop_errors[0] == exc_info[1]
  326. class _naughty_deque(deque):
  327. def append(self, s):
  328. request, async_object, xid = s
  329. return deque.append(self, (request, async_object, xid + 1)) # +1s