图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1487 lines
45 KiB

  1. import os
  2. import socket
  3. import sys
  4. import tempfile
  5. import threading
  6. import time
  7. import uuid
  8. import unittest
  9. from unittest.mock import Mock, MagicMock, patch
  10. import pytest
  11. from kazoo.testing import KazooTestCase
  12. from kazoo.exceptions import (
  13. AuthFailedError,
  14. BadArgumentsError,
  15. BadVersionError,
  16. ConfigurationError,
  17. ConnectionClosedError,
  18. ConnectionLoss,
  19. InvalidACLError,
  20. NoAuthError,
  21. NoNodeError,
  22. NodeExistsError,
  23. SessionExpiredError,
  24. KazooException,
  25. )
  26. from kazoo.protocol.connection import _CONNECTION_DROP
  27. from kazoo.protocol.states import KeeperState, KazooState
  28. from kazoo.tests.util import CI_ZK_VERSION
  29. if sys.version_info > (3,): # pragma: nocover
  30. def u(s):
  31. return s
  32. else: # pragma: nocover
  33. def u(s):
  34. return unicode(s, "unicode_escape") # noqa
  35. class TestClientTransitions(KazooTestCase):
  36. @staticmethod
  37. def make_event():
  38. return threading.Event()
  39. def test_connection_and_disconnection(self):
  40. states = []
  41. rc = threading.Event()
  42. @self.client.add_listener
  43. def listener(state):
  44. states.append(state)
  45. if state == KazooState.CONNECTED:
  46. rc.set()
  47. self.client.stop()
  48. assert states == [KazooState.LOST]
  49. states.pop()
  50. self.client.start()
  51. rc.wait(2)
  52. assert states == [KazooState.CONNECTED]
  53. rc.clear()
  54. states.pop()
  55. self.expire_session(self.make_event)
  56. rc.wait(2)
  57. req_states = [KazooState.LOST, KazooState.CONNECTED]
  58. assert states == req_states
  59. class TestClientConstructor(unittest.TestCase):
  60. def _makeOne(self, *args, **kw):
  61. from kazoo.client import KazooClient
  62. return KazooClient(*args, **kw)
  63. def test_invalid_handler(self):
  64. from kazoo.handlers.threading import SequentialThreadingHandler
  65. with pytest.raises(ConfigurationError):
  66. self._makeOne(handler=SequentialThreadingHandler)
  67. def test_chroot(self):
  68. assert self._makeOne(hosts="127.0.0.1:2181/").chroot == ""
  69. assert self._makeOne(hosts="127.0.0.1:2181/a").chroot == "/a"
  70. assert self._makeOne(hosts="127.0.0.1/a").chroot == "/a"
  71. assert self._makeOne(hosts="127.0.0.1/a/b").chroot == "/a/b"
  72. assert (
  73. self._makeOne(hosts="127.0.0.1:2181,127.0.0.1:2182/a/b").chroot
  74. == "/a/b"
  75. )
  76. def test_connection_timeout(self):
  77. from kazoo.handlers.threading import KazooTimeoutError
  78. client = self._makeOne(hosts="127.0.0.1:9")
  79. assert client.handler.timeout_exception is KazooTimeoutError
  80. with pytest.raises(KazooTimeoutError):
  81. client.start(0.1)
  82. def test_ordered_host_selection(self):
  83. client = self._makeOne(
  84. hosts="127.0.0.1:9,127.0.0.2:9/a", randomize_hosts=False
  85. )
  86. hosts = [h for h in client.hosts]
  87. assert hosts == [("127.0.0.1", 9), ("127.0.0.2", 9)]
  88. def test_invalid_hostname(self):
  89. client = self._makeOne(hosts="nosuchhost/a")
  90. timeout = client.handler.timeout_exception
  91. with pytest.raises(timeout):
  92. client.start(0.1)
  93. def test_another_invalid_hostname(self):
  94. with pytest.raises(ValueError):
  95. self._makeOne(hosts="/nosuchhost/a")
  96. def test_retry_options_dict(self):
  97. from kazoo.retry import KazooRetry
  98. client = self._makeOne(
  99. command_retry=dict(max_tries=99), connection_retry=dict(delay=99)
  100. )
  101. assert type(client._conn_retry) is KazooRetry
  102. assert type(client._retry) is KazooRetry
  103. assert client._retry.max_tries == 99
  104. assert client._conn_retry.delay == 99
  105. class TestAuthentication(KazooTestCase):
  106. def _makeAuth(self, *args, **kwargs):
  107. from kazoo.security import make_digest_acl
  108. return make_digest_acl(*args, **kwargs)
  109. def test_auth(self):
  110. username = uuid.uuid4().hex
  111. password = uuid.uuid4().hex
  112. digest_auth = "%s:%s" % (username, password)
  113. acl = self._makeAuth(username, password, all=True)
  114. client = self._get_client()
  115. client.start()
  116. client.add_auth("digest", digest_auth)
  117. client.default_acl = (acl,)
  118. try:
  119. client.create("/1")
  120. client.create("/1/2")
  121. client.ensure_path("/1/2/3")
  122. eve = self._get_client()
  123. eve.start()
  124. with pytest.raises(NoAuthError):
  125. eve.get("/1/2")
  126. # try again with the wrong auth token
  127. eve.add_auth("digest", "badbad:bad")
  128. with pytest.raises(NoAuthError):
  129. eve.get("/1/2")
  130. finally:
  131. # Ensure we remove the ACL protected nodes
  132. client.delete("/1", recursive=True)
  133. eve.stop()
  134. eve.close()
  135. def test_connect_auth(self):
  136. username = uuid.uuid4().hex
  137. password = uuid.uuid4().hex
  138. digest_auth = "%s:%s" % (username, password)
  139. acl = self._makeAuth(username, password, all=True)
  140. client = self._get_client(auth_data=[("digest", digest_auth)])
  141. client.start()
  142. try:
  143. client.create("/1", acl=(acl,))
  144. # give ZK a chance to copy data to other node
  145. time.sleep(0.1)
  146. with pytest.raises(NoAuthError):
  147. self.client.get("/1")
  148. finally:
  149. client.delete("/1")
  150. client.stop()
  151. client.close()
  152. def test_unicode_auth(self):
  153. username = u(r"xe4/\hm")
  154. password = u(r"/\xe4hm")
  155. digest_auth = "%s:%s" % (username, password)
  156. acl = self._makeAuth(username, password, all=True)
  157. client = self._get_client()
  158. client.start()
  159. client.add_auth("digest", digest_auth)
  160. client.default_acl = (acl,)
  161. try:
  162. client.create("/1")
  163. client.ensure_path("/1/2/3")
  164. eve = self._get_client()
  165. eve.start()
  166. with pytest.raises(NoAuthError):
  167. eve.get("/1/2")
  168. # try again with the wrong auth token
  169. eve.add_auth("digest", "badbad:bad")
  170. with pytest.raises(NoAuthError):
  171. eve.get("/1/2")
  172. finally:
  173. # Ensure we remove the ACL protected nodes
  174. client.delete("/1", recursive=True)
  175. eve.stop()
  176. eve.close()
  177. def test_invalid_auth(self):
  178. client = self._get_client()
  179. client.start()
  180. with pytest.raises(TypeError):
  181. client.add_auth("digest", ("user", "pass"))
  182. with pytest.raises(TypeError):
  183. client.add_auth(None, ("user", "pass"))
  184. def test_async_auth(self):
  185. client = self._get_client()
  186. client.start()
  187. username = uuid.uuid4().hex
  188. password = uuid.uuid4().hex
  189. digest_auth = "%s:%s" % (username, password)
  190. result = client.add_auth_async("digest", digest_auth)
  191. assert result.get() is True
  192. def test_async_auth_failure(self):
  193. client = self._get_client()
  194. client.start()
  195. username = uuid.uuid4().hex
  196. password = uuid.uuid4().hex
  197. digest_auth = "%s:%s" % (username, password)
  198. with pytest.raises(AuthFailedError):
  199. client.add_auth("unknown-scheme", digest_auth)
  200. client.stop()
  201. def test_add_auth_on_reconnect(self):
  202. client = self._get_client()
  203. client.start()
  204. client.add_auth("digest", "jsmith:jsmith")
  205. client._connection._socket.shutdown(socket.SHUT_RDWR)
  206. while not client.connected:
  207. time.sleep(0.1)
  208. assert ("digest", "jsmith:jsmith") in client.auth_data
  209. class TestConnection(KazooTestCase):
  210. @staticmethod
  211. def make_event():
  212. return threading.Event()
  213. @staticmethod
  214. def make_condition():
  215. return threading.Condition()
  216. def test_chroot_warning(self):
  217. k = self._get_nonchroot_client()
  218. k.chroot = "abba"
  219. try:
  220. with patch("warnings.warn") as mock_func:
  221. k.start()
  222. assert mock_func.called
  223. finally:
  224. k.stop()
  225. def test_session_expire(self):
  226. from kazoo.protocol.states import KazooState
  227. cv = self.make_event()
  228. def watch_events(event):
  229. if event == KazooState.LOST:
  230. cv.set()
  231. self.client.add_listener(watch_events)
  232. self.expire_session(self.make_event)
  233. cv.wait(3)
  234. assert cv.is_set()
  235. def test_bad_session_expire(self):
  236. from kazoo.protocol.states import KazooState
  237. cv = self.make_event()
  238. ab = self.make_event()
  239. def watch_events(event):
  240. if event == KazooState.LOST:
  241. ab.set()
  242. raise Exception("oops")
  243. cv.set()
  244. self.client.add_listener(watch_events)
  245. self.expire_session(self.make_event)
  246. ab.wait(0.5)
  247. assert ab.is_set()
  248. cv.wait(0.5)
  249. assert not cv.is_set()
  250. def test_state_listener(self):
  251. from kazoo.protocol.states import KazooState
  252. states = []
  253. condition = self.make_condition()
  254. def listener(state):
  255. with condition:
  256. states.append(state)
  257. condition.notify_all()
  258. self.client.stop()
  259. assert self.client.state == KazooState.LOST
  260. self.client.add_listener(listener)
  261. self.client.start(5)
  262. with condition:
  263. if not states:
  264. condition.wait(5)
  265. assert len(states) == 1
  266. assert states[0] == KazooState.CONNECTED
  267. def test_invalid_listener(self):
  268. with pytest.raises(ConfigurationError):
  269. self.client.add_listener(15)
  270. def test_listener_only_called_on_real_state_change(self):
  271. from kazoo.protocol.states import KazooState
  272. assert self.client.state == KazooState.CONNECTED
  273. called = [False]
  274. condition = self.make_event()
  275. def listener(state):
  276. called[0] = True
  277. condition.set()
  278. self.client.add_listener(listener)
  279. self.client._make_state_change(KazooState.CONNECTED)
  280. condition.wait(3)
  281. assert called[0] is False
  282. def test_no_connection(self):
  283. client = self.client
  284. client.stop()
  285. assert client.connected is False
  286. assert client.client_id is None
  287. with pytest.raises(ConnectionClosedError):
  288. client.exists("/")
  289. def test_close_connecting_connection(self):
  290. client = self.client
  291. client.stop()
  292. ev = self.make_event()
  293. def close_on_connecting(state):
  294. if state in (KazooState.CONNECTED, KazooState.LOST):
  295. ev.set()
  296. client.add_listener(close_on_connecting)
  297. client.start()
  298. # Wait until we connect
  299. ev.wait(5)
  300. ev.clear()
  301. self.client._call(_CONNECTION_DROP, client.handler.async_result())
  302. client.stop()
  303. # ...and then wait until the connection is lost
  304. ev.wait(5)
  305. with pytest.raises(ConnectionClosedError):
  306. self.client.create("/foobar")
  307. def test_double_start(self):
  308. assert self.client.connected is True
  309. self.client.start()
  310. assert self.client.connected is True
  311. def test_double_stop(self):
  312. self.client.stop()
  313. assert self.client.connected is False
  314. self.client.stop()
  315. assert self.client.connected is False
  316. def test_restart(self):
  317. assert self.client.connected is True
  318. self.client.restart()
  319. assert self.client.connected is True
  320. def test_closed(self):
  321. client = self.client
  322. client.stop()
  323. write_sock = client._connection._write_sock
  324. # close the connection to free the socket
  325. client.close()
  326. assert client._connection._write_sock is None
  327. # sneak in and patch client to simulate race between a thread
  328. # calling stop(); close() and one running a command
  329. oldstate = client._state
  330. client._state = KeeperState.CONNECTED
  331. client._connection._write_sock = write_sock
  332. try:
  333. # simulate call made after write socket is closed
  334. with pytest.raises(ConnectionClosedError):
  335. client.exists("/")
  336. # simulate call made after write socket is set to None
  337. client._connection._write_sock = None
  338. with pytest.raises(ConnectionClosedError):
  339. client.exists("/")
  340. finally:
  341. # reset for teardown
  342. client._state = oldstate
  343. client._connection._write_sock = None
  344. def test_watch_trigger_expire(self):
  345. client = self.client
  346. cv = self.make_event()
  347. client.create("/test", b"")
  348. def test_watch(event):
  349. cv.set()
  350. client.get("/test/", watch=test_watch)
  351. self.expire_session(self.make_event)
  352. cv.wait(3)
  353. assert cv.is_set()
  354. class TestClient(KazooTestCase):
  355. def _makeOne(self, *args):
  356. from kazoo.handlers.threading import SequentialThreadingHandler
  357. return SequentialThreadingHandler(*args)
  358. def _getKazooState(self):
  359. from kazoo.protocol.states import KazooState
  360. return KazooState
  361. def test_server_version_retries_fail(self):
  362. client = self.client
  363. side_effects = [
  364. "",
  365. "zookeeper.version=",
  366. "zookeeper.version=1.",
  367. "zookeeper.ver",
  368. ]
  369. client.command = MagicMock()
  370. client.command.side_effect = side_effects
  371. with pytest.raises(KazooException):
  372. client.server_version(retries=len(side_effects) - 1)
  373. def test_server_version_retries_eventually_ok(self):
  374. client = self.client
  375. actual_version = "zookeeper.version=1.2"
  376. side_effects = []
  377. for i in range(0, len(actual_version) + 1):
  378. side_effects.append(actual_version[0:i])
  379. client.command = MagicMock()
  380. client.command.side_effect = side_effects
  381. assert client.server_version(retries=len(side_effects) - 1) == (1, 2)
  382. def test_client_id(self):
  383. client_id = self.client.client_id
  384. assert type(client_id) is tuple
  385. # make sure password is of correct length
  386. assert len(client_id[1]) == 16
  387. def test_connected(self):
  388. client = self.client
  389. assert client.connected
  390. def test_create(self):
  391. client = self.client
  392. path = client.create("/1")
  393. assert path == "/1"
  394. assert client.exists("/1")
  395. def test_create_on_broken_connection(self):
  396. client = self.client
  397. client.start()
  398. client._state = KeeperState.EXPIRED_SESSION
  399. with pytest.raises(SessionExpiredError):
  400. client.create("/closedpath", b"bar")
  401. client._state = KeeperState.AUTH_FAILED
  402. with pytest.raises(AuthFailedError):
  403. client.create("/closedpath", b"bar")
  404. client.stop()
  405. client.close()
  406. with pytest.raises(ConnectionClosedError):
  407. client.create("/closedpath", b"bar")
  408. def test_create_null_data(self):
  409. client = self.client
  410. client.create("/nulldata", None)
  411. value, _ = client.get("/nulldata")
  412. assert value is None
  413. def test_create_empty_string(self):
  414. client = self.client
  415. client.create("/empty", b"")
  416. value, _ = client.get("/empty")
  417. assert value == b""
  418. def test_create_unicode_path(self):
  419. client = self.client
  420. path = client.create(u("/ascii"))
  421. assert path == u("/ascii")
  422. path = client.create(u("/\xe4hm"))
  423. assert path == u("/\xe4hm")
  424. def test_create_async_returns_unchrooted_path(self):
  425. client = self.client
  426. path = client.create_async("/1").get()
  427. assert path == "/1"
  428. def test_create_invalid_path(self):
  429. client = self.client
  430. with pytest.raises(TypeError):
  431. client.create(("a",))
  432. with pytest.raises(ValueError):
  433. client.create(".")
  434. with pytest.raises(ValueError):
  435. client.create("/a/../b")
  436. with pytest.raises(BadArgumentsError):
  437. client.create("/b\x00")
  438. with pytest.raises(BadArgumentsError):
  439. client.create("/b\x1e")
  440. def test_create_invalid_arguments(self):
  441. from kazoo.security import OPEN_ACL_UNSAFE
  442. single_acl = OPEN_ACL_UNSAFE[0]
  443. client = self.client
  444. with pytest.raises(TypeError):
  445. client.create("a", acl="all")
  446. with pytest.raises(TypeError):
  447. client.create("a", acl=single_acl)
  448. with pytest.raises(TypeError):
  449. client.create("a", value=["a"])
  450. with pytest.raises(TypeError):
  451. client.create("a", ephemeral="yes")
  452. with pytest.raises(TypeError):
  453. client.create("a", sequence="yes")
  454. with pytest.raises(TypeError):
  455. client.create("a", makepath="yes")
  456. def test_create_value(self):
  457. client = self.client
  458. client.create("/1", b"bytes")
  459. data, stat = client.get("/1")
  460. assert data == b"bytes"
  461. def test_create_unicode_value(self):
  462. client = self.client
  463. with pytest.raises(TypeError):
  464. client.create("/1", u("\xe4hm"))
  465. def test_create_large_value(self):
  466. client = self.client
  467. kb_512 = b"a" * (512 * 1024)
  468. client.create("/1", kb_512)
  469. assert client.exists("/1")
  470. mb_2 = b"a" * (2 * 1024 * 1024)
  471. with pytest.raises(ConnectionLoss):
  472. client.create("/2", mb_2)
  473. def test_create_acl_duplicate(self):
  474. from kazoo.security import OPEN_ACL_UNSAFE
  475. single_acl = OPEN_ACL_UNSAFE[0]
  476. client = self.client
  477. client.create("/1", acl=[single_acl, single_acl])
  478. acls, stat = client.get_acls("/1")
  479. # ZK >3.4 removes duplicate ACL entries
  480. if CI_ZK_VERSION:
  481. version = CI_ZK_VERSION
  482. else:
  483. version = client.server_version()
  484. assert len(acls) == 1 if version > (3, 4) else 2
  485. def test_create_acl_empty_list(self):
  486. from kazoo.security import OPEN_ACL_UNSAFE
  487. client = self.client
  488. client.create("/1", acl=[])
  489. acls, stat = client.get_acls("/1")
  490. assert acls == OPEN_ACL_UNSAFE
  491. def test_version_no_connection(self):
  492. self.client.stop()
  493. with pytest.raises(ConnectionLoss):
  494. self.client.server_version()
  495. def test_create_ephemeral(self):
  496. client = self.client
  497. client.create("/1", b"ephemeral", ephemeral=True)
  498. data, stat = client.get("/1")
  499. assert data == b"ephemeral"
  500. assert stat.ephemeralOwner == client.client_id[0]
  501. def test_create_no_ephemeral(self):
  502. client = self.client
  503. client.create("/1", b"val1")
  504. data, stat = client.get("/1")
  505. assert not stat.ephemeralOwner
  506. def test_create_ephemeral_no_children(self):
  507. from kazoo.exceptions import NoChildrenForEphemeralsError
  508. client = self.client
  509. client.create("/1", b"ephemeral", ephemeral=True)
  510. with pytest.raises(NoChildrenForEphemeralsError):
  511. client.create("/1/2", b"val1")
  512. with pytest.raises(NoChildrenForEphemeralsError):
  513. client.create("/1/2", b"val1", ephemeral=True)
  514. def test_create_sequence(self):
  515. client = self.client
  516. client.create("/folder")
  517. path = client.create("/folder/a", b"sequence", sequence=True)
  518. assert path == "/folder/a0000000000"
  519. path2 = client.create("/folder/a", b"sequence", sequence=True)
  520. assert path2 == "/folder/a0000000001"
  521. path3 = client.create("/folder/", b"sequence", sequence=True)
  522. assert path3 == "/folder/0000000002"
  523. def test_create_ephemeral_sequence(self):
  524. basepath = "/" + uuid.uuid4().hex
  525. realpath = self.client.create(
  526. basepath, b"sandwich", sequence=True, ephemeral=True
  527. )
  528. assert basepath != realpath and realpath.startswith(basepath)
  529. data, stat = self.client.get(realpath)
  530. assert data == b"sandwich"
  531. def test_create_makepath(self):
  532. self.client.create("/1/2", b"val1", makepath=True)
  533. data, stat = self.client.get("/1/2")
  534. assert data == b"val1"
  535. self.client.create("/1/2/3/4/5", b"val2", makepath=True)
  536. data, stat = self.client.get("/1/2/3/4/5")
  537. assert data == b"val2"
  538. with pytest.raises(NodeExistsError):
  539. self.client.create("/1/2/3/4/5", b"val2", makepath=True)
  540. def test_create_makepath_incompatible_acls(self):
  541. from kazoo.client import KazooClient
  542. from kazoo.security import make_digest_acl_credential, CREATOR_ALL_ACL
  543. credential = make_digest_acl_credential("username", "password")
  544. alt_client = KazooClient(
  545. self.cluster[0].address + self.client.chroot,
  546. max_retries=5,
  547. auth_data=[("digest", credential)],
  548. handler=self._makeOne(),
  549. )
  550. alt_client.start()
  551. alt_client.create("/1/2", b"val2", makepath=True, acl=CREATOR_ALL_ACL)
  552. try:
  553. with pytest.raises(NoAuthError):
  554. self.client.create("/1/2/3/4/5", b"val2", makepath=True)
  555. finally:
  556. alt_client.delete("/", recursive=True)
  557. alt_client.stop()
  558. def test_create_no_makepath(self):
  559. with pytest.raises(NoNodeError):
  560. self.client.create("/1/2", b"val1")
  561. with pytest.raises(NoNodeError):
  562. self.client.create("/1/2", b"val1", makepath=False)
  563. self.client.create("/1/2", b"val1", makepath=True)
  564. with pytest.raises(NoNodeError):
  565. self.client.create("/1/2/3/4", b"val1", makepath=False)
  566. def test_create_exists(self):
  567. from kazoo.exceptions import NodeExistsError
  568. client = self.client
  569. path = client.create("/1")
  570. with pytest.raises(NodeExistsError):
  571. client.create(path)
  572. def test_create_stat(self):
  573. if CI_ZK_VERSION:
  574. version = CI_ZK_VERSION
  575. else:
  576. version = self.client.server_version()
  577. if not version or version < (3, 5):
  578. pytest.skip("Must use Zookeeper 3.5 or above")
  579. client = self.client
  580. path, stat1 = client.create("/1", b"bytes", include_data=True)
  581. data, stat2 = client.get("/1")
  582. assert data == b"bytes"
  583. assert stat1 == stat2
  584. def test_create_get_set(self):
  585. nodepath = "/" + uuid.uuid4().hex
  586. self.client.create(nodepath, b"sandwich", ephemeral=True)
  587. data, stat = self.client.get(nodepath)
  588. assert data == b"sandwich"
  589. newstat = self.client.set(nodepath, b"hats", stat.version)
  590. assert newstat
  591. assert newstat.version > stat.version
  592. # Some other checks of the ZnodeStat object we got
  593. assert newstat.acl_version == stat.acl_version
  594. assert newstat.created == stat.ctime / 1000.0
  595. assert newstat.last_modified == newstat.mtime / 1000.0
  596. assert newstat.owner_session_id == stat.ephemeralOwner
  597. assert newstat.creation_transaction_id == stat.czxid
  598. assert newstat.last_modified_transaction_id == newstat.mzxid
  599. assert newstat.data_length == newstat.dataLength
  600. assert newstat.children_count == stat.numChildren
  601. assert newstat.children_version == stat.cversion
  602. def test_get_invalid_arguments(self):
  603. client = self.client
  604. with pytest.raises(TypeError):
  605. client.get(("a", "b"))
  606. with pytest.raises(TypeError):
  607. client.get("a", watch=True)
  608. def test_bad_argument(self):
  609. client = self.client
  610. client.ensure_path("/1")
  611. with pytest.raises(TypeError):
  612. self.client.set("/1", 1)
  613. def test_ensure_path(self):
  614. client = self.client
  615. client.ensure_path("/1/2")
  616. assert client.exists("/1/2")
  617. client.ensure_path("/1/2/3/4")
  618. assert client.exists("/1/2/3/4")
  619. def test_sync(self):
  620. client = self.client
  621. assert client.sync("/") == "/"
  622. # Albeit surprising, you can sync anything, even what does not exist.
  623. assert client.sync("/not_there") == "/not_there"
  624. def test_exists(self):
  625. nodepath = "/" + uuid.uuid4().hex
  626. exists = self.client.exists(nodepath)
  627. assert exists is None
  628. self.client.create(nodepath, b"sandwich", ephemeral=True)
  629. exists = self.client.exists(nodepath)
  630. assert exists
  631. assert isinstance(exists.version, int)
  632. multi_node_nonexistent = "/" + uuid.uuid4().hex + "/hats"
  633. exists = self.client.exists(multi_node_nonexistent)
  634. assert exists is None
  635. def test_exists_invalid_arguments(self):
  636. client = self.client
  637. with pytest.raises(TypeError):
  638. client.exists(("a", "b"))
  639. with pytest.raises(TypeError):
  640. client.exists("a", watch=True)
  641. def test_exists_watch(self):
  642. nodepath = "/" + uuid.uuid4().hex
  643. event = self.client.handler.event_object()
  644. def w(watch_event):
  645. assert watch_event.path == nodepath
  646. event.set()
  647. exists = self.client.exists(nodepath, watch=w)
  648. assert exists is None
  649. self.client.create(nodepath, ephemeral=True)
  650. event.wait(1)
  651. assert event.is_set() is True
  652. def test_exists_watcher_exception(self):
  653. nodepath = "/" + uuid.uuid4().hex
  654. event = self.client.handler.event_object()
  655. # if the watcher throws an exception, all we can really do is log it
  656. def w(watch_event):
  657. assert watch_event.path == nodepath
  658. event.set()
  659. raise Exception("test exception in callback")
  660. exists = self.client.exists(nodepath, watch=w)
  661. assert exists is None
  662. self.client.create(nodepath, ephemeral=True)
  663. event.wait(1)
  664. assert event.is_set() is True
  665. def test_create_delete(self):
  666. nodepath = "/" + uuid.uuid4().hex
  667. self.client.create(nodepath, b"zzz")
  668. self.client.delete(nodepath)
  669. exists = self.client.exists(nodepath)
  670. assert exists is None
  671. def test_get_acls(self):
  672. from kazoo.security import make_digest_acl
  673. user = "user"
  674. passw = "pass"
  675. acl = make_digest_acl(user, passw, all=True)
  676. client = self.client
  677. try:
  678. client.create("/a", acl=[acl])
  679. client.add_auth("digest", "{}:{}".format(user, passw))
  680. assert acl in client.get_acls("/a")[0]
  681. finally:
  682. client.delete("/a")
  683. def test_get_acls_invalid_arguments(self):
  684. client = self.client
  685. with pytest.raises(TypeError):
  686. client.get_acls(("a", "b"))
  687. def test_set_acls(self):
  688. from kazoo.security import make_digest_acl
  689. user = "user"
  690. passw = "pass"
  691. acl = make_digest_acl(user, passw, all=True)
  692. client = self.client
  693. client.create("/a")
  694. try:
  695. client.set_acls("/a", [acl])
  696. client.add_auth("digest", "{}:{}".format(user, passw))
  697. assert acl in client.get_acls("/a")[0]
  698. finally:
  699. client.delete("/a")
  700. def test_set_acls_empty(self):
  701. client = self.client
  702. client.create("/a")
  703. with pytest.raises(InvalidACLError):
  704. client.set_acls("/a", [])
  705. def test_set_acls_no_node(self):
  706. from kazoo.security import OPEN_ACL_UNSAFE
  707. client = self.client
  708. with pytest.raises(NoNodeError):
  709. client.set_acls("/a", OPEN_ACL_UNSAFE)
  710. def test_set_acls_invalid_arguments(self):
  711. from kazoo.security import OPEN_ACL_UNSAFE
  712. single_acl = OPEN_ACL_UNSAFE[0]
  713. client = self.client
  714. with pytest.raises(TypeError):
  715. client.set_acls(("a", "b"), ())
  716. with pytest.raises(TypeError):
  717. client.set_acls("a", single_acl)
  718. with pytest.raises(TypeError):
  719. client.set_acls("a", "all")
  720. with pytest.raises(TypeError):
  721. client.set_acls("a", [single_acl], "V1")
  722. def test_set(self):
  723. client = self.client
  724. client.create("a", b"first")
  725. stat = client.set("a", b"second")
  726. data, stat2 = client.get("a")
  727. assert data == b"second"
  728. assert stat == stat2
  729. def test_set_null_data(self):
  730. client = self.client
  731. client.create("/nulldata", b"not none")
  732. client.set("/nulldata", None)
  733. value, _ = client.get("/nulldata")
  734. assert value is None
  735. def test_set_empty_string(self):
  736. client = self.client
  737. client.create("/empty", b"not empty")
  738. client.set("/empty", b"")
  739. value, _ = client.get("/empty")
  740. assert value == b""
  741. def test_set_invalid_arguments(self):
  742. client = self.client
  743. client.create("a", b"first")
  744. with pytest.raises(TypeError):
  745. client.set(("a", "b"), b"value")
  746. with pytest.raises(TypeError):
  747. client.set("a", ["v", "w"])
  748. with pytest.raises(TypeError):
  749. client.set("a", b"value", "V1")
  750. def test_delete(self):
  751. client = self.client
  752. client.ensure_path("/a/b")
  753. assert "b" in client.get_children("a")
  754. client.delete("/a/b")
  755. assert "b" not in client.get_children("a")
  756. def test_delete_recursive(self):
  757. client = self.client
  758. client.ensure_path("/a/b/c")
  759. client.ensure_path("/a/b/d")
  760. client.delete("/a/b", recursive=True)
  761. client.delete("/a/b/c", recursive=True)
  762. assert "b" not in client.get_children("a")
  763. def test_delete_invalid_arguments(self):
  764. client = self.client
  765. client.ensure_path("/a/b")
  766. with pytest.raises(TypeError):
  767. client.delete("/a/b", recursive="all")
  768. with pytest.raises(TypeError):
  769. client.delete(("a", "b"))
  770. with pytest.raises(TypeError):
  771. client.delete("/a/b", version="V1")
  772. def test_get_children(self):
  773. client = self.client
  774. client.ensure_path("/a/b/c")
  775. client.ensure_path("/a/b/d")
  776. assert client.get_children("/a") == ["b"]
  777. assert set(client.get_children("/a/b")) == set(["c", "d"])
  778. assert client.get_children("/a/b/c") == []
  779. def test_get_children2(self):
  780. client = self.client
  781. client.ensure_path("/a/b")
  782. children, stat = client.get_children("/a", include_data=True)
  783. value, stat2 = client.get("/a")
  784. assert children == ["b"]
  785. assert stat2.version == stat.version
  786. def test_get_children2_many_nodes(self):
  787. client = self.client
  788. client.ensure_path("/a/b")
  789. client.ensure_path("/a/c")
  790. client.ensure_path("/a/d")
  791. children, stat = client.get_children("/a", include_data=True)
  792. value, stat2 = client.get("/a")
  793. assert set(children) == set(["b", "c", "d"])
  794. assert stat2.version == stat.version
  795. def test_get_children_no_node(self):
  796. client = self.client
  797. with pytest.raises(NoNodeError):
  798. client.get_children("/none")
  799. with pytest.raises(NoNodeError):
  800. client.get_children("/none", include_data=True)
  801. def test_get_children_invalid_path(self):
  802. client = self.client
  803. with pytest.raises(ValueError):
  804. client.get_children("../a")
  805. def test_get_children_invalid_arguments(self):
  806. client = self.client
  807. with pytest.raises(TypeError):
  808. client.get_children(("a", "b"))
  809. with pytest.raises(TypeError):
  810. client.get_children("a", watch=True)
  811. with pytest.raises(TypeError):
  812. client.get_children("a", include_data="yes")
  813. def test_invalid_auth(self):
  814. from kazoo.exceptions import AuthFailedError
  815. from kazoo.protocol.states import KeeperState
  816. client = self.client
  817. client.stop()
  818. client._state = KeeperState.AUTH_FAILED
  819. with pytest.raises(AuthFailedError):
  820. client.get("/")
  821. def test_client_state(self):
  822. from kazoo.protocol.states import KeeperState
  823. assert self.client.client_state == KeeperState.CONNECTED
  824. def test_update_host_list(self):
  825. from kazoo.client import KazooClient
  826. from kazoo.protocol.states import KeeperState
  827. hosts = self.cluster[0].address
  828. # create a client with only one server in its list
  829. client = KazooClient(hosts=hosts)
  830. client.start()
  831. # try to change the chroot, not currently allowed
  832. with pytest.raises(ConfigurationError):
  833. client.set_hosts(hosts + "/new_chroot")
  834. # grow the cluster to 3
  835. client.set_hosts(self.servers)
  836. # shut down the first host
  837. try:
  838. self.cluster[0].stop()
  839. time.sleep(5)
  840. assert client.client_state == KeeperState.CONNECTED
  841. finally:
  842. self.cluster[0].run()
  843. # utility for test_request_queuing*
  844. def _make_request_queuing_client(self):
  845. from kazoo.client import KazooClient
  846. server = self.cluster[0]
  847. handler = self._makeOne()
  848. # create a client with only one server in its list, and
  849. # infinite retries
  850. client = KazooClient(
  851. hosts=server.address + self.client.chroot,
  852. handler=handler,
  853. connection_retry=dict(
  854. max_tries=-1,
  855. delay=0.1,
  856. backoff=1,
  857. max_jitter=0.0,
  858. sleep_func=handler.sleep_func,
  859. ),
  860. )
  861. return client, server
  862. # utility for test_request_queuing*
  863. def _request_queuing_common(self, client, server, path, expire_session):
  864. ev_suspended = client.handler.event_object()
  865. ev_connected = client.handler.event_object()
  866. def listener(state):
  867. if state == KazooState.SUSPENDED:
  868. ev_suspended.set()
  869. elif state == KazooState.CONNECTED:
  870. ev_connected.set()
  871. client.add_listener(listener)
  872. # wait for the client to connect
  873. client.start()
  874. try:
  875. # force the client to suspend
  876. server.stop()
  877. ev_suspended.wait(5)
  878. assert ev_suspended.is_set()
  879. ev_connected.clear()
  880. # submit a request, expecting it to be queued
  881. result = client.create_async(path)
  882. assert len(client._queue) != 0
  883. assert result.ready() is False
  884. assert client.state == KazooState.SUSPENDED
  885. # optionally cause a SessionExpiredError to occur by
  886. # mangling the first byte of the session password.
  887. if expire_session:
  888. b0 = b"\x00"
  889. if client._session_passwd[0] == 0:
  890. b0 = b"\xff"
  891. client._session_passwd = b0 + client._session_passwd[1:]
  892. finally:
  893. server.run()
  894. # wait for the client to reconnect (either with a recovered
  895. # session, or with a new one if expire_session was set)
  896. ev_connected.wait(5)
  897. assert ev_connected.is_set()
  898. return result
  899. def test_request_queuing_session_recovered(self):
  900. path = "/" + uuid.uuid4().hex
  901. client, server = self._make_request_queuing_client()
  902. try:
  903. result = self._request_queuing_common(
  904. client=client, server=server, path=path, expire_session=False
  905. )
  906. assert result.get() == path
  907. assert client.exists(path) is not None
  908. finally:
  909. client.stop()
  910. def test_request_queuing_session_expired(self):
  911. path = "/" + uuid.uuid4().hex
  912. client, server = self._make_request_queuing_client()
  913. try:
  914. result = self._request_queuing_common(
  915. client=client, server=server, path=path, expire_session=True
  916. )
  917. assert len(client._queue) == 0
  918. with pytest.raises(SessionExpiredError):
  919. result.get()
  920. finally:
  921. client.stop()
  922. class TestSSLClient(KazooTestCase):
  923. def setUp(self):
  924. if CI_ZK_VERSION and CI_ZK_VERSION < (3, 5):
  925. pytest.skip("Must use Zookeeper 3.5 or above")
  926. ssl_path = tempfile.mkdtemp()
  927. key_path = os.path.join(ssl_path, "key.pem")
  928. cert_path = os.path.join(ssl_path, "cert.pem")
  929. cacert_path = os.path.join(ssl_path, "cacert.pem")
  930. with open(key_path, "wb") as key_file:
  931. key_file.write(
  932. self.cluster.get_ssl_client_configuration()["client_key"]
  933. )
  934. with open(cert_path, "wb") as cert_file:
  935. cert_file.write(
  936. self.cluster.get_ssl_client_configuration()["client_cert"]
  937. )
  938. with open(cacert_path, "wb") as cacert_file:
  939. cacert_file.write(
  940. self.cluster.get_ssl_client_configuration()["ca_cert"]
  941. )
  942. self.setup_zookeeper(
  943. use_ssl=True, keyfile=key_path, certfile=cert_path, ca=cacert_path
  944. )
  945. def test_create(self):
  946. client = self.client
  947. path = client.create("/1")
  948. assert path == "/1"
  949. assert client.exists("/1")
  950. dummy_dict = {
  951. "aversion": 1,
  952. "ctime": 0,
  953. "cversion": 1,
  954. "czxid": 110,
  955. "dataLength": 1,
  956. "ephemeralOwner": "ben",
  957. "mtime": 1,
  958. "mzxid": 1,
  959. "numChildren": 0,
  960. "pzxid": 1,
  961. "version": 1,
  962. }
  963. class TestClientTransactions(KazooTestCase):
  964. def setUp(self):
  965. KazooTestCase.setUp(self)
  966. skip = False
  967. if CI_ZK_VERSION and CI_ZK_VERSION < (3, 4):
  968. skip = True
  969. elif CI_ZK_VERSION and CI_ZK_VERSION >= (3, 4):
  970. skip = False
  971. else:
  972. ver = self.client.server_version()
  973. if ver[1] < 4:
  974. skip = True
  975. if skip:
  976. pytest.skip("Must use Zookeeper 3.4 or above")
  977. def test_basic_create(self):
  978. t = self.client.transaction()
  979. t.create("/freddy")
  980. t.create("/fred", ephemeral=True)
  981. t.create("/smith", sequence=True)
  982. results = t.commit()
  983. assert len(results) == 3
  984. assert results[0] == "/freddy"
  985. assert results[2].startswith("/smith0") is True
  986. def test_bad_creates(self):
  987. args_list = [
  988. (True,),
  989. ("/smith", 0),
  990. ("/smith", b"", "bleh"),
  991. ("/smith", b"", None, "fred"),
  992. ("/smith", b"", None, True, "fred"),
  993. ]
  994. for args in args_list:
  995. with pytest.raises(TypeError):
  996. t = self.client.transaction()
  997. t.create(*args)
  998. def test_default_acl(self):
  999. from kazoo.security import make_digest_acl
  1000. username = uuid.uuid4().hex
  1001. password = uuid.uuid4().hex
  1002. digest_auth = "%s:%s" % (username, password)
  1003. acl = make_digest_acl(username, password, all=True)
  1004. self.client.add_auth("digest", digest_auth)
  1005. self.client.default_acl = (acl,)
  1006. t = self.client.transaction()
  1007. t.create("/freddy")
  1008. results = t.commit()
  1009. assert results[0] == "/freddy"
  1010. def test_basic_delete(self):
  1011. self.client.create("/fred")
  1012. t = self.client.transaction()
  1013. t.delete("/fred")
  1014. results = t.commit()
  1015. assert results[0] is True
  1016. def test_bad_deletes(self):
  1017. args_list = [
  1018. (True,),
  1019. ("/smith", "woops"),
  1020. ]
  1021. for args in args_list:
  1022. with pytest.raises(TypeError):
  1023. t = self.client.transaction()
  1024. t.delete(*args)
  1025. def test_set(self):
  1026. self.client.create("/fred", b"01")
  1027. t = self.client.transaction()
  1028. t.set_data("/fred", b"oops")
  1029. t.commit()
  1030. res = self.client.get("/fred")
  1031. assert res[0] == b"oops"
  1032. def test_bad_sets(self):
  1033. args_list = [(42, 52), ("/smith", False), ("/smith", b"", "oops")]
  1034. for args in args_list:
  1035. with pytest.raises(TypeError):
  1036. t = self.client.transaction()
  1037. t.set_data(*args)
  1038. def test_check(self):
  1039. self.client.create("/fred")
  1040. version = self.client.get("/fred")[1].version
  1041. t = self.client.transaction()
  1042. t.check("/fred", version)
  1043. t.create("/blah")
  1044. results = t.commit()
  1045. assert results[0] is True
  1046. assert results[1] == "/blah"
  1047. def test_bad_checks(self):
  1048. args_list = [(42, 52), ("/smith", "oops")]
  1049. for args in args_list:
  1050. with pytest.raises(TypeError):
  1051. t = self.client.transaction()
  1052. t.check(*args)
  1053. def test_bad_transaction(self):
  1054. from kazoo.exceptions import RolledBackError, NoNodeError
  1055. t = self.client.transaction()
  1056. t.create("/fred")
  1057. t.delete("/smith")
  1058. results = t.commit()
  1059. assert results[0].__class__ == RolledBackError
  1060. assert results[1].__class__ == NoNodeError
  1061. def test_bad_commit(self):
  1062. t = self.client.transaction()
  1063. t.committed = True
  1064. with pytest.raises(ValueError):
  1065. t.commit()
  1066. def test_bad_context(self):
  1067. with pytest.raises(TypeError):
  1068. with self.client.transaction() as t:
  1069. t.check(4232)
  1070. def test_context(self):
  1071. with self.client.transaction() as t:
  1072. t.create("/smith", b"32")
  1073. assert self.client.get("/smith")[0] == b"32"
  1074. class TestSessionCallbacks(unittest.TestCase):
  1075. def test_session_callback_states(self):
  1076. from kazoo.protocol.states import KazooState, KeeperState
  1077. from kazoo.client import KazooClient
  1078. client = KazooClient()
  1079. client._handle = 1
  1080. client._live.set()
  1081. result = client._session_callback(KeeperState.CONNECTED)
  1082. assert result is None
  1083. # Now with stopped
  1084. client._stopped.set()
  1085. result = client._session_callback(KeeperState.CONNECTED)
  1086. assert result is None
  1087. # Test several state transitions
  1088. client._stopped.clear()
  1089. client.start_async = lambda: True
  1090. client._session_callback(KeeperState.CONNECTED)
  1091. assert client.state == KazooState.CONNECTED
  1092. client._session_callback(KeeperState.AUTH_FAILED)
  1093. assert client.state == KazooState.LOST
  1094. client._handle = 1
  1095. client._session_callback(-250)
  1096. assert client.state == KazooState.SUSPENDED
  1097. class TestCallbacks(KazooTestCase):
  1098. def test_async_result_callbacks_are_always_called(self):
  1099. # create a callback object
  1100. callback_mock = Mock()
  1101. # simulate waiting for a response
  1102. async_result = self.client.handler.async_result()
  1103. async_result.rawlink(callback_mock)
  1104. # begin the procedure to stop the client
  1105. self.client.stop()
  1106. # the response has just been received;
  1107. # this should be on another thread,
  1108. # simultaneously with the stop procedure
  1109. async_result.set_exception(
  1110. Exception("Anything that throws an exception")
  1111. )
  1112. # with the fix the callback should be called
  1113. assert callback_mock.call_count > 0
  1114. class TestNonChrootClient(KazooTestCase):
  1115. def test_create(self):
  1116. client = self._get_nonchroot_client()
  1117. assert client.chroot == ""
  1118. client.start()
  1119. node = uuid.uuid4().hex
  1120. path = client.create(node, ephemeral=True)
  1121. client.delete(path)
  1122. client.stop()
  1123. def test_unchroot(self):
  1124. client = self._get_nonchroot_client()
  1125. client.chroot = "/a"
  1126. # Unchroot'ing the chroot path should return "/"
  1127. assert client.unchroot("/a") == "/"
  1128. assert client.unchroot("/a/b") == "/b"
  1129. assert client.unchroot("/b/c") == "/b/c"
  1130. class TestReconfig(KazooTestCase):
  1131. def setUp(self):
  1132. KazooTestCase.setUp(self)
  1133. if CI_ZK_VERSION:
  1134. version = CI_ZK_VERSION
  1135. else:
  1136. version = self.client.server_version()
  1137. if not version or version < (3, 5):
  1138. pytest.skip("Must use Zookeeper 3.5 or above")
  1139. def test_no_super_auth(self):
  1140. with pytest.raises(NoAuthError):
  1141. self.client.reconfig(
  1142. joining="server.999=0.0.0.0:1234:2345:observer;3456",
  1143. leaving=None,
  1144. new_members=None,
  1145. )
  1146. def test_add_remove_observer(self):
  1147. def free_sock_port():
  1148. s = socket.socket()
  1149. s.bind(("", 0))
  1150. return s, s.getsockname()[1]
  1151. username = "super"
  1152. password = "test"
  1153. digest_auth = "%s:%s" % (username, password)
  1154. client = self._get_client(auth_data=[("digest", digest_auth)])
  1155. client.start()
  1156. # get ports for election, zab and client endpoints. we need to use
  1157. # ports for which we'd immediately get a RST upon connect(); otherwise
  1158. # the cluster could crash if it gets a SocketTimeoutException:
  1159. # https://issues.apache.org/jira/browse/ZOOKEEPER-2202
  1160. s1, port1 = free_sock_port()
  1161. s2, port2 = free_sock_port()
  1162. s3, port3 = free_sock_port()
  1163. joining = "server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d" % (
  1164. port1,
  1165. port2,
  1166. port3,
  1167. )
  1168. data, _ = client.reconfig(
  1169. joining=joining, leaving=None, new_members=None
  1170. )
  1171. assert joining.encode("utf8") in data
  1172. data, _ = client.reconfig(
  1173. joining=None, leaving="100", new_members=None
  1174. )
  1175. assert joining.encode("utf8") not in data
  1176. # try to add it again, but a config number in the future
  1177. curver = int(data.decode().split("\n")[-1].split("=")[1], base=16)
  1178. with pytest.raises(BadVersionError):
  1179. self.client.reconfig(
  1180. joining=joining,
  1181. leaving=None,
  1182. new_members=None,
  1183. from_config=curver + 1,
  1184. )
  1185. def test_bad_input(self):
  1186. with pytest.raises(BadArgumentsError):
  1187. self.client.reconfig(
  1188. joining="some thing", leaving=None, new_members=None
  1189. )