图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

821 lines
25 KiB

  1. import collections
  2. import threading
  3. import unittest
  4. from unittest.mock import MagicMock
  5. import uuid
  6. import pytest
  7. from kazoo.exceptions import CancelledError
  8. from kazoo.exceptions import LockTimeout
  9. from kazoo.exceptions import NoNodeError
  10. from kazoo.recipe.lock import Lock
  11. from kazoo.testing import KazooTestCase
  12. from kazoo.tests import util as test_util
  13. class SleepBarrier(object):
  14. """A crappy spinning barrier."""
  15. def __init__(self, wait_for, sleep_func):
  16. self._wait_for = wait_for
  17. self._arrived = collections.deque()
  18. self._sleep_func = sleep_func
  19. def __enter__(self):
  20. self._arrived.append(threading.current_thread())
  21. return self
  22. def __exit__(self, type, value, traceback):
  23. try:
  24. self._arrived.remove(threading.current_thread())
  25. except ValueError:
  26. pass
  27. def wait(self):
  28. while len(self._arrived) < self._wait_for:
  29. self._sleep_func(0.001)
  30. class KazooLockTests(KazooTestCase):
  31. thread_count = 20
  32. def __init__(self, *args, **kw):
  33. super(KazooLockTests, self).__init__(*args, **kw)
  34. self.threads_made = []
  35. def tearDown(self):
  36. super(KazooLockTests, self).tearDown()
  37. while self.threads_made:
  38. t = self.threads_made.pop()
  39. t.join()
  40. @staticmethod
  41. def make_condition():
  42. return threading.Condition()
  43. @staticmethod
  44. def make_event():
  45. return threading.Event()
  46. def make_thread(self, *args, **kwargs):
  47. t = threading.Thread(*args, **kwargs)
  48. t.daemon = True
  49. self.threads_made.append(t)
  50. return t
  51. @staticmethod
  52. def make_wait():
  53. return test_util.Wait()
  54. def setUp(self):
  55. super(KazooLockTests, self).setUp()
  56. self.lockpath = "/" + uuid.uuid4().hex
  57. self.condition = self.make_condition()
  58. self.released = self.make_event()
  59. self.active_thread = None
  60. self.cancelled_threads = []
  61. def _thread_lock_acquire_til_event(self, name, lock, event):
  62. try:
  63. with lock:
  64. with self.condition:
  65. assert self.active_thread is None
  66. self.active_thread = name
  67. self.condition.notify_all()
  68. event.wait()
  69. with self.condition:
  70. assert self.active_thread == name
  71. self.active_thread = None
  72. self.condition.notify_all()
  73. self.released.set()
  74. except CancelledError:
  75. with self.condition:
  76. self.cancelled_threads.append(name)
  77. self.condition.notify_all()
  78. def test_lock_one(self):
  79. lock_name = uuid.uuid4().hex
  80. lock = self.client.Lock(self.lockpath, lock_name)
  81. event = self.make_event()
  82. thread = self.make_thread(
  83. target=self._thread_lock_acquire_til_event,
  84. args=(lock_name, lock, event),
  85. )
  86. thread.start()
  87. lock2_name = uuid.uuid4().hex
  88. anotherlock = self.client.Lock(self.lockpath, lock2_name)
  89. # wait for any contender to show up on the lock
  90. wait = self.make_wait()
  91. wait(anotherlock.contenders)
  92. assert anotherlock.contenders() == [lock_name]
  93. with self.condition:
  94. while self.active_thread != lock_name:
  95. self.condition.wait()
  96. # release the lock
  97. event.set()
  98. with self.condition:
  99. while self.active_thread:
  100. self.condition.wait()
  101. self.released.wait()
  102. thread.join()
  103. def test_lock(self):
  104. threads = []
  105. names = ["contender" + str(i) for i in range(5)]
  106. contender_bits = {}
  107. for name in names:
  108. ev = self.make_event()
  109. lock = self.client.Lock(self.lockpath, name)
  110. thread = self.make_thread(
  111. target=self._thread_lock_acquire_til_event,
  112. args=(name, lock, ev),
  113. )
  114. contender_bits[name] = (thread, ev)
  115. threads.append(thread)
  116. # acquire the lock ourselves first to make the others line up
  117. lock = self.client.Lock(self.lockpath, "test")
  118. lock.acquire()
  119. for thread in threads:
  120. thread.start()
  121. # wait for everyone to line up on the lock
  122. wait = self.make_wait()
  123. wait(lambda: len(lock.contenders()) == 6)
  124. contenders = lock.contenders()
  125. assert contenders[0] == "test"
  126. contenders = contenders[1:]
  127. remaining = list(contenders)
  128. # release the lock and contenders should claim it in order
  129. lock.release()
  130. for contender in contenders:
  131. thread, event = contender_bits[contender]
  132. with self.condition:
  133. while not self.active_thread:
  134. self.condition.wait()
  135. assert self.active_thread == contender
  136. assert lock.contenders() == remaining
  137. remaining = remaining[1:]
  138. event.set()
  139. with self.condition:
  140. while self.active_thread:
  141. self.condition.wait()
  142. for thread in threads:
  143. thread.join()
  144. def test_lock_reconnect(self):
  145. event = self.make_event()
  146. other_lock = self.client.Lock(self.lockpath, "contender")
  147. thread = self.make_thread(
  148. target=self._thread_lock_acquire_til_event,
  149. args=("contender", other_lock, event),
  150. )
  151. # acquire the lock ourselves first to make the contender line up
  152. lock = self.client.Lock(self.lockpath, "test")
  153. lock.acquire()
  154. thread.start()
  155. # wait for the contender to line up on the lock
  156. wait = self.make_wait()
  157. wait(lambda: len(lock.contenders()) == 2)
  158. assert lock.contenders() == ["test", "contender"]
  159. self.expire_session(self.make_event)
  160. lock.release()
  161. with self.condition:
  162. while not self.active_thread:
  163. self.condition.wait()
  164. assert self.active_thread == "contender"
  165. event.set()
  166. thread.join()
  167. def test_lock_non_blocking(self):
  168. lock_name = uuid.uuid4().hex
  169. lock = self.client.Lock(self.lockpath, lock_name)
  170. event = self.make_event()
  171. thread = self.make_thread(
  172. target=self._thread_lock_acquire_til_event,
  173. args=(lock_name, lock, event),
  174. )
  175. thread.start()
  176. lock1 = self.client.Lock(self.lockpath, lock_name)
  177. # wait for the thread to acquire the lock
  178. with self.condition:
  179. if not self.active_thread:
  180. self.condition.wait(5)
  181. assert not lock1.acquire(blocking=False)
  182. assert lock.contenders() == [lock_name] # just one - itself
  183. event.set()
  184. thread.join()
  185. def test_lock_fail_first_call(self):
  186. event1 = self.make_event()
  187. lock1 = self.client.Lock(self.lockpath, "one")
  188. thread1 = self.make_thread(
  189. target=self._thread_lock_acquire_til_event,
  190. args=("one", lock1, event1),
  191. )
  192. thread1.start()
  193. # wait for this thread to acquire the lock
  194. with self.condition:
  195. if not self.active_thread:
  196. self.condition.wait(5)
  197. assert self.active_thread == "one"
  198. assert lock1.contenders() == ["one"]
  199. event1.set()
  200. thread1.join()
  201. def test_lock_cancel(self):
  202. event1 = self.make_event()
  203. lock1 = self.client.Lock(self.lockpath, "one")
  204. thread1 = self.make_thread(
  205. target=self._thread_lock_acquire_til_event,
  206. args=("one", lock1, event1),
  207. )
  208. thread1.start()
  209. # wait for this thread to acquire the lock
  210. with self.condition:
  211. if not self.active_thread:
  212. self.condition.wait(5)
  213. assert self.active_thread == "one"
  214. client2 = self._get_client()
  215. client2.start()
  216. event2 = self.make_event()
  217. lock2 = client2.Lock(self.lockpath, "two")
  218. thread2 = self.make_thread(
  219. target=self._thread_lock_acquire_til_event,
  220. args=("two", lock2, event2),
  221. )
  222. thread2.start()
  223. # this one should block in acquire. check that it is a contender
  224. wait = self.make_wait()
  225. wait(lambda: len(lock2.contenders()) > 1)
  226. assert lock2.contenders() == ["one", "two"]
  227. lock2.cancel()
  228. with self.condition:
  229. if "two" not in self.cancelled_threads:
  230. self.condition.wait()
  231. assert "two" in self.cancelled_threads
  232. assert lock2.contenders() == ["one"]
  233. thread2.join()
  234. event1.set()
  235. thread1.join()
  236. client2.stop()
  237. def test_lock_no_double_calls(self):
  238. lock1 = self.client.Lock(self.lockpath, "one")
  239. lock1.acquire()
  240. assert lock1.is_acquired is True
  241. assert lock1.acquire(timeout=0.5) is False
  242. assert lock1.is_acquired is True
  243. lock1.release()
  244. assert lock1.is_acquired is False
  245. def test_lock_same_thread_no_block(self):
  246. lock = self.client.Lock(self.lockpath, "one")
  247. gotten = lock.acquire(blocking=False)
  248. assert gotten is True
  249. assert lock.is_acquired is True
  250. gotten = lock.acquire(blocking=False)
  251. assert gotten is False
  252. def test_lock_many_threads_no_block(self):
  253. lock = self.client.Lock(self.lockpath, "one")
  254. attempts = collections.deque()
  255. def _acquire():
  256. attempts.append(int(lock.acquire(blocking=False)))
  257. threads = []
  258. for _i in range(0, self.thread_count):
  259. t = self.make_thread(target=_acquire)
  260. threads.append(t)
  261. t.start()
  262. while threads:
  263. t = threads.pop()
  264. t.join()
  265. assert sum(list(attempts)) == 1
  266. def test_lock_many_threads(self):
  267. sleep_func = self.client.handler.sleep_func
  268. lock = self.client.Lock(self.lockpath, "one")
  269. acquires = collections.deque()
  270. differences = collections.deque()
  271. barrier = SleepBarrier(self.thread_count, sleep_func)
  272. def _acquire():
  273. # Wait until all threads are ready to go...
  274. with barrier as b:
  275. b.wait()
  276. with lock:
  277. # Ensure that no two threads enter here and cause the
  278. # count to differ by more than one, do this by recording
  279. # the count that was captured and examining it post run.
  280. starting_count = len(acquires)
  281. acquires.append(1)
  282. sleep_func(0.01)
  283. end_count = len(acquires)
  284. differences.append(end_count - starting_count)
  285. threads = []
  286. for _i in range(0, self.thread_count):
  287. t = self.make_thread(target=_acquire)
  288. threads.append(t)
  289. t.start()
  290. while threads:
  291. t = threads.pop()
  292. t.join()
  293. assert len(acquires) == self.thread_count
  294. assert list(differences) == [1] * self.thread_count
  295. def test_lock_reacquire(self):
  296. lock = self.client.Lock(self.lockpath, "one")
  297. lock.acquire()
  298. lock.release()
  299. lock.acquire()
  300. lock.release()
  301. def test_lock_ephemeral(self):
  302. client1 = self._get_client()
  303. client1.start()
  304. lock = client1.Lock(self.lockpath, "ephemeral")
  305. lock.acquire(ephemeral=False)
  306. znode = self.lockpath + "/" + lock.node
  307. client1.stop()
  308. try:
  309. self.client.get(znode)
  310. except NoNodeError:
  311. self.fail("NoNodeError raised unexpectedly!")
  312. def test_lock_timeout(self):
  313. timeout = 3
  314. e = self.make_event()
  315. started = self.make_event()
  316. # In the background thread, acquire the lock and wait thrice the time
  317. # that the main thread is going to wait to acquire the lock.
  318. lock1 = self.client.Lock(self.lockpath, "one")
  319. def _thread(lock, event, timeout):
  320. with lock:
  321. started.set()
  322. event.wait(timeout)
  323. if not event.is_set():
  324. # Eventually fail to avoid hanging the tests
  325. self.fail("lock2 never timed out")
  326. t = self.make_thread(target=_thread, args=(lock1, e, timeout * 3))
  327. t.start()
  328. # Start the main thread's kazoo client and try to acquire the lock
  329. # but give up after `timeout` seconds
  330. client2 = self._get_client()
  331. client2.start()
  332. started.wait(5)
  333. assert started.is_set() is True
  334. lock2 = client2.Lock(self.lockpath, "two")
  335. try:
  336. lock2.acquire(timeout=timeout)
  337. except LockTimeout:
  338. # A timeout is the behavior we're expecting, since the background
  339. # thread should still be holding onto the lock
  340. pass
  341. else:
  342. self.fail("Main thread unexpectedly acquired the lock")
  343. finally:
  344. # Cleanup
  345. e.set()
  346. t.join()
  347. client2.stop()
  348. def test_read_lock(self):
  349. # Test that we can obtain a read lock
  350. lock = self.client.ReadLock(self.lockpath, "reader one")
  351. gotten = lock.acquire(blocking=False)
  352. assert gotten is True
  353. assert lock.is_acquired is True
  354. # and that it's still not reentrant.
  355. gotten = lock.acquire(blocking=False)
  356. assert gotten is False
  357. # Test that a second client we can share the same read lock
  358. client2 = self._get_client()
  359. client2.start()
  360. lock2 = client2.ReadLock(self.lockpath, "reader two")
  361. gotten = lock2.acquire(blocking=False)
  362. assert gotten is True
  363. assert lock2.is_acquired is True
  364. gotten = lock2.acquire(blocking=False)
  365. assert gotten is False
  366. # Test that a writer is unable to share it
  367. client3 = self._get_client()
  368. client3.start()
  369. lock3 = client3.WriteLock(self.lockpath, "writer")
  370. gotten = lock3.acquire(blocking=False)
  371. assert gotten is False
  372. def test_write_lock(self):
  373. # Test that we can obtain a write lock
  374. lock = self.client.WriteLock(self.lockpath, "writer")
  375. gotten = lock.acquire(blocking=False)
  376. assert gotten is True
  377. assert lock.is_acquired is True
  378. gotten = lock.acquire(blocking=False)
  379. assert gotten is False
  380. # Test that we are unable to obtain a read lock while the
  381. # write lock is held.
  382. client2 = self._get_client()
  383. client2.start()
  384. lock2 = client2.ReadLock(self.lockpath, "reader")
  385. gotten = lock2.acquire(blocking=False)
  386. assert gotten is False
  387. def test_rw_lock(self):
  388. reader_event = self.make_event()
  389. reader_lock = self.client.ReadLock(self.lockpath, "reader")
  390. reader_thread = self.make_thread(
  391. target=self._thread_lock_acquire_til_event,
  392. args=("reader", reader_lock, reader_event),
  393. )
  394. writer_event = self.make_event()
  395. writer_lock = self.client.WriteLock(self.lockpath, "writer")
  396. writer_thread = self.make_thread(
  397. target=self._thread_lock_acquire_til_event,
  398. args=("writer", writer_lock, writer_event),
  399. )
  400. # acquire a write lock ourselves first to make the others line up
  401. lock = self.client.WriteLock(self.lockpath, "test")
  402. lock.acquire()
  403. reader_thread.start()
  404. writer_thread.start()
  405. # wait for everyone to line up on the lock
  406. wait = self.make_wait()
  407. wait(lambda: len(lock.contenders()) == 3)
  408. contenders = lock.contenders()
  409. assert contenders[0] == "test"
  410. remaining = contenders[1:]
  411. # release the lock and contenders should claim it in order
  412. lock.release()
  413. contender_bits = {
  414. "reader": (reader_thread, reader_event),
  415. "writer": (writer_thread, writer_event),
  416. }
  417. for contender in ("reader", "writer"):
  418. thread, event = contender_bits[contender]
  419. with self.condition:
  420. while not self.active_thread:
  421. self.condition.wait()
  422. assert self.active_thread == contender
  423. assert lock.contenders() == remaining
  424. remaining = remaining[1:]
  425. event.set()
  426. with self.condition:
  427. while self.active_thread:
  428. self.condition.wait()
  429. reader_thread.join()
  430. writer_thread.join()
  431. class TestSemaphore(KazooTestCase):
  432. def __init__(self, *args, **kw):
  433. super(TestSemaphore, self).__init__(*args, **kw)
  434. self.threads_made = []
  435. def tearDown(self):
  436. super(TestSemaphore, self).tearDown()
  437. while self.threads_made:
  438. t = self.threads_made.pop()
  439. t.join()
  440. @staticmethod
  441. def make_condition():
  442. return threading.Condition()
  443. @staticmethod
  444. def make_event():
  445. return threading.Event()
  446. def make_thread(self, *args, **kwargs):
  447. t = threading.Thread(*args, **kwargs)
  448. t.daemon = True
  449. self.threads_made.append(t)
  450. return t
  451. def setUp(self):
  452. super(TestSemaphore, self).setUp()
  453. self.lockpath = "/" + uuid.uuid4().hex
  454. self.condition = self.make_condition()
  455. self.released = self.make_event()
  456. self.active_thread = None
  457. self.cancelled_threads = []
  458. def test_basic(self):
  459. sem1 = self.client.Semaphore(self.lockpath)
  460. sem1.acquire()
  461. sem1.release()
  462. def test_lock_one(self):
  463. sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
  464. sem2 = self.client.Semaphore(self.lockpath, max_leases=1)
  465. started = self.make_event()
  466. event = self.make_event()
  467. sem1.acquire()
  468. def sema_one():
  469. started.set()
  470. with sem2:
  471. event.set()
  472. thread = self.make_thread(target=sema_one, args=())
  473. thread.start()
  474. started.wait(10)
  475. assert event.is_set() is False
  476. sem1.release()
  477. event.wait(10)
  478. assert event.is_set() is True
  479. thread.join()
  480. def test_non_blocking(self):
  481. sem1 = self.client.Semaphore(
  482. self.lockpath, identifier="sem1", max_leases=2
  483. )
  484. sem2 = self.client.Semaphore(
  485. self.lockpath, identifier="sem2", max_leases=2
  486. )
  487. sem3 = self.client.Semaphore(
  488. self.lockpath, identifier="sem3", max_leases=2
  489. )
  490. sem1.acquire()
  491. sem2.acquire()
  492. assert not sem3.acquire(blocking=False)
  493. assert set(sem1.lease_holders()) == set(["sem1", "sem2"])
  494. sem2.release()
  495. # the next line isn't required, but avoids timing issues in tests
  496. sem3.acquire()
  497. assert set(sem1.lease_holders()) == set(["sem1", "sem3"])
  498. sem1.release()
  499. sem3.release()
  500. def test_non_blocking_release(self):
  501. sem1 = self.client.Semaphore(
  502. self.lockpath, identifier="sem1", max_leases=1
  503. )
  504. sem2 = self.client.Semaphore(
  505. self.lockpath, identifier="sem2", max_leases=1
  506. )
  507. sem1.acquire()
  508. sem2.acquire(blocking=False)
  509. # make sure there's no shutdown / cleanup error
  510. sem1.release()
  511. sem2.release()
  512. def test_holders(self):
  513. started = self.make_event()
  514. event = self.make_event()
  515. def sema_one():
  516. with self.client.Semaphore(self.lockpath, "fred", max_leases=1):
  517. started.set()
  518. event.wait()
  519. thread = self.make_thread(target=sema_one, args=())
  520. thread.start()
  521. started.wait()
  522. sem1 = self.client.Semaphore(self.lockpath)
  523. holders = sem1.lease_holders()
  524. assert holders == ["fred"]
  525. event.set()
  526. thread.join()
  527. def test_semaphore_cancel(self):
  528. sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
  529. sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1)
  530. sem1.acquire()
  531. started = self.make_event()
  532. event = self.make_event()
  533. def sema_one():
  534. started.set()
  535. try:
  536. with sem2:
  537. started.set()
  538. except CancelledError:
  539. event.set()
  540. thread = self.make_thread(target=sema_one, args=())
  541. thread.start()
  542. started.wait()
  543. assert sem1.lease_holders() == ["fred"]
  544. assert not event.is_set()
  545. sem2.cancel()
  546. event.wait()
  547. assert event.is_set()
  548. thread.join()
  549. def test_multiple_acquire_and_release(self):
  550. sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
  551. sem1.acquire()
  552. sem1.acquire()
  553. assert sem1.release()
  554. assert not sem1.release()
  555. def test_handle_session_loss(self):
  556. expire_semaphore = self.client.Semaphore(
  557. self.lockpath, "fred", max_leases=1
  558. )
  559. client = self._get_client()
  560. client.start()
  561. lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1)
  562. lh_semaphore.acquire()
  563. started = self.make_event()
  564. event = self.make_event()
  565. event2 = self.make_event()
  566. def sema_one():
  567. started.set()
  568. with expire_semaphore:
  569. event.set()
  570. event2.wait()
  571. thread1 = self.make_thread(target=sema_one, args=())
  572. thread1.start()
  573. started.wait()
  574. assert lh_semaphore.lease_holders() == ["george"]
  575. # Fired in a separate thread to make sure we can see the effect
  576. expired = self.make_event()
  577. def expire():
  578. self.expire_session(self.make_event)
  579. expired.set()
  580. thread2 = self.make_thread(target=expire, args=())
  581. thread2.start()
  582. expire_semaphore.wake_event.wait()
  583. expired.wait()
  584. lh_semaphore.release()
  585. client.stop()
  586. event.wait(15)
  587. assert expire_semaphore.lease_holders() == ["fred"]
  588. event2.set()
  589. for t in (thread1, thread2):
  590. t.join()
  591. def test_inconsistent_max_leases(self):
  592. sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
  593. sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
  594. sem1.acquire()
  595. with pytest.raises(ValueError):
  596. sem2.acquire()
  597. def test_inconsistent_max_leases_other_data(self):
  598. sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
  599. sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
  600. self.client.ensure_path(self.lockpath)
  601. self.client.set(self.lockpath, b"a$")
  602. sem1.acquire()
  603. # sem2 thinks it's ok to have two lease holders
  604. assert sem2.acquire(blocking=False)
  605. def test_reacquire(self):
  606. lock = self.client.Semaphore(self.lockpath)
  607. lock.acquire()
  608. lock.release()
  609. lock.acquire()
  610. lock.release()
  611. def test_acquire_after_cancelled(self):
  612. lock = self.client.Semaphore(self.lockpath)
  613. assert lock.acquire() is True
  614. assert lock.release() is True
  615. lock.cancel()
  616. assert lock.cancelled is True
  617. assert lock.acquire() is True
  618. def test_timeout(self):
  619. timeout = 3
  620. e = self.make_event()
  621. started = self.make_event()
  622. # In the background thread, acquire the lock and wait thrice the time
  623. # that the main thread is going to wait to acquire the lock.
  624. sem1 = self.client.Semaphore(self.lockpath, "one")
  625. def _thread(sem, event, timeout):
  626. with sem:
  627. started.set()
  628. event.wait(timeout)
  629. if not event.is_set():
  630. # Eventually fail to avoid hanging the tests
  631. self.fail("sem2 never timed out")
  632. t = self.make_thread(target=_thread, args=(sem1, e, timeout * 3))
  633. t.start()
  634. # Start the main thread's kazoo client and try to acquire the lock
  635. # but give up after `timeout` seconds
  636. client2 = self._get_client()
  637. client2.start()
  638. started.wait(5)
  639. assert started.is_set() is True
  640. sem2 = client2.Semaphore(self.lockpath, "two")
  641. try:
  642. sem2.acquire(timeout=timeout)
  643. except LockTimeout:
  644. # A timeout is the behavior we're expecting, since the background
  645. # thread will still be holding onto the lock
  646. e.set()
  647. finally:
  648. # Cleanup
  649. t.join()
  650. client2.stop()
  651. class TestSequence(unittest.TestCase):
  652. def test_get_predecessor(self):
  653. """Validate selection of predecessors."""
  654. goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
  655. pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
  656. children = ["hello", goLock, "world", pyLock]
  657. client = MagicMock()
  658. client.get_children.return_value = children
  659. lock = Lock(client, "test")
  660. assert lock._get_predecessor(pyLock) is None
  661. def test_get_predecessor_go(self):
  662. """Test selection of predecessor when instructed to consider go-zk
  663. locks.
  664. """
  665. goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
  666. pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
  667. children = ["hello", goLock, "world", pyLock]
  668. client = MagicMock()
  669. client.get_children.return_value = children
  670. lock = Lock(client, "test", extra_lock_patterns=["-lock-"])
  671. assert lock._get_predecessor(pyLock) == goLock