图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

723 lines
23 KiB

  1. """Zookeeper Locking Implementations
  2. :Maintainer: Ben Bangert <ben@groovie.org>
  3. :Status: Production
  4. Error Handling
  5. ==============
  6. It's highly recommended to add a state listener with
  7. :meth:`~KazooClient.add_listener` and watch for
  8. :attr:`~KazooState.LOST` and :attr:`~KazooState.SUSPENDED` state
  9. changes and re-act appropriately. In the event that a
  10. :attr:`~KazooState.LOST` state occurs, its certain that the lock
  11. and/or the lease has been lost.
  12. """
  13. import re
  14. import time
  15. import uuid
  16. from kazoo.exceptions import (
  17. CancelledError,
  18. KazooException,
  19. LockTimeout,
  20. NoNodeError,
  21. )
  22. from kazoo.protocol.states import KazooState
  23. from kazoo.retry import (
  24. ForceRetryError,
  25. KazooRetry,
  26. RetryFailedError,
  27. )
  28. class _Watch(object):
  29. def __init__(self, duration=None):
  30. self.duration = duration
  31. self.started_at = None
  32. def start(self):
  33. self.started_at = time.monotonic()
  34. def leftover(self):
  35. if self.duration is None:
  36. return None
  37. else:
  38. elapsed = time.monotonic() - self.started_at
  39. return max(0, self.duration - elapsed)
  40. class Lock(object):
  41. """Kazoo Lock
  42. Example usage with a :class:`~kazoo.client.KazooClient` instance:
  43. .. code-block:: python
  44. zk = KazooClient()
  45. zk.start()
  46. lock = zk.Lock("/lockpath", "my-identifier")
  47. with lock: # blocks waiting for lock acquisition
  48. # do something with the lock
  49. Note: This lock is not *re-entrant*. Repeated calls after already
  50. acquired will block.
  51. This is an exclusive lock. For a read/write lock, see :class:`WriteLock`
  52. and :class:`ReadLock`.
  53. """
  54. # Node name, after the contender UUID, before the sequence
  55. # number. Involved in read/write locks.
  56. _NODE_NAME = "__lock__"
  57. # Node names which exclude this contender when present at a lower
  58. # sequence number. Involved in read/write locks.
  59. _EXCLUDE_NAMES = ["__lock__"]
  60. def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
  61. """Create a Kazoo lock.
  62. :param client: A :class:`~kazoo.client.KazooClient` instance.
  63. :param path: The lock path to use.
  64. :param identifier: Name to use for this lock contender. This can be
  65. useful for querying to see who the current lock
  66. contenders are.
  67. :param extra_lock_patterns: Strings that will be used to
  68. identify other znode in the path
  69. that should be considered contenders
  70. for this lock.
  71. Use this for cross-implementation
  72. compatibility.
  73. .. versionadded:: 2.7.1
  74. The extra_lock_patterns option.
  75. """
  76. self.client = client
  77. self.path = path
  78. self._exclude_names = set(
  79. self._EXCLUDE_NAMES + list(extra_lock_patterns)
  80. )
  81. self._contenders_re = re.compile(
  82. r"(?:{patterns})(-?\d{{10}})$".format(
  83. patterns="|".join(self._exclude_names)
  84. )
  85. )
  86. # some data is written to the node. this can be queried via
  87. # contenders() to see who is contending for the lock
  88. self.data = str(identifier or "").encode("utf-8")
  89. self.node = None
  90. self.wake_event = client.handler.event_object()
  91. # props to Netflix Curator for this trick. It is possible for our
  92. # create request to succeed on the server, but for a failure to
  93. # prevent us from getting back the full path name. We prefix our
  94. # lock name with a uuid and can check for its presence on retry.
  95. self.prefix = uuid.uuid4().hex + self._NODE_NAME
  96. self.create_path = self.path + "/" + self.prefix
  97. self.create_tried = False
  98. self.is_acquired = False
  99. self.assured_path = False
  100. self.cancelled = False
  101. self._retry = KazooRetry(
  102. max_tries=None, sleep_func=client.handler.sleep_func
  103. )
  104. self._acquire_method_lock = client.handler.lock_object()
  105. def _ensure_path(self):
  106. self.client.ensure_path(self.path)
  107. self.assured_path = True
  108. def cancel(self):
  109. """Cancel a pending lock acquire."""
  110. self.cancelled = True
  111. self.wake_event.set()
  112. def acquire(self, blocking=True, timeout=None, ephemeral=True):
  113. """
  114. Acquire the lock. By defaults blocks and waits forever.
  115. :param blocking: Block until lock is obtained or return immediately.
  116. :type blocking: bool
  117. :param timeout: Don't wait forever to acquire the lock.
  118. :type timeout: float or None
  119. :param ephemeral: Don't use ephemeral znode for the lock.
  120. :type ephemeral: bool
  121. :returns: Was the lock acquired?
  122. :rtype: bool
  123. :raises: :exc:`~kazoo.exceptions.LockTimeout` if the lock
  124. wasn't acquired within `timeout` seconds.
  125. .. warning::
  126. When :attr:`ephemeral` is set to False session expiration
  127. will not release the lock and must be handled separately.
  128. .. versionadded:: 1.1
  129. The timeout option.
  130. .. versionadded:: 2.4.1
  131. The ephemeral option.
  132. """
  133. retry = self._retry.copy()
  134. retry.deadline = timeout
  135. # Ensure we are locked so that we avoid multiple threads in
  136. # this acquistion routine at the same time...
  137. method_locked = self._acquire_method_lock.acquire(
  138. blocking=blocking, timeout=timeout if timeout is not None else -1
  139. )
  140. if not method_locked:
  141. return False
  142. already_acquired = self.is_acquired
  143. try:
  144. gotten = False
  145. try:
  146. gotten = retry(
  147. self._inner_acquire,
  148. blocking=blocking,
  149. timeout=timeout,
  150. ephemeral=ephemeral,
  151. )
  152. except RetryFailedError:
  153. pass
  154. except KazooException:
  155. # if we did ultimately fail, attempt to clean up
  156. if not already_acquired:
  157. self._best_effort_cleanup()
  158. self.cancelled = False
  159. raise
  160. if gotten:
  161. self.is_acquired = gotten
  162. if not gotten and not already_acquired:
  163. self._best_effort_cleanup()
  164. return gotten
  165. finally:
  166. self._acquire_method_lock.release()
  167. def _watch_session(self, state):
  168. self.wake_event.set()
  169. return True
  170. def _inner_acquire(self, blocking, timeout, ephemeral=True):
  171. # wait until it's our chance to get it..
  172. if self.is_acquired:
  173. if not blocking:
  174. return False
  175. raise ForceRetryError()
  176. # make sure our election parent node exists
  177. if not self.assured_path:
  178. self._ensure_path()
  179. node = None
  180. if self.create_tried:
  181. node = self._find_node()
  182. else:
  183. self.create_tried = True
  184. if not node:
  185. node = self.client.create(
  186. self.create_path, self.data, ephemeral=ephemeral, sequence=True
  187. )
  188. # strip off path to node
  189. node = node[len(self.path) + 1 :]
  190. self.node = node
  191. while True:
  192. self.wake_event.clear()
  193. # bail out with an exception if cancellation has been requested
  194. if self.cancelled:
  195. raise CancelledError()
  196. predecessor = self._get_predecessor(node)
  197. if predecessor is None:
  198. return True
  199. if not blocking:
  200. return False
  201. # otherwise we are in the mix. watch predecessor and bide our time
  202. predecessor = self.path + "/" + predecessor
  203. self.client.add_listener(self._watch_session)
  204. try:
  205. self.client.get(predecessor, self._watch_predecessor)
  206. except NoNodeError:
  207. pass # predecessor has already been deleted
  208. else:
  209. self.wake_event.wait(timeout)
  210. if not self.wake_event.is_set():
  211. raise LockTimeout(
  212. "Failed to acquire lock on %s after %s seconds"
  213. % (self.path, timeout)
  214. )
  215. finally:
  216. self.client.remove_listener(self._watch_session)
  217. def _watch_predecessor(self, event):
  218. self.wake_event.set()
  219. def _get_predecessor(self, node):
  220. """returns `node`'s predecessor or None
  221. Note: This handle the case where the current lock is not a contender
  222. (e.g. rlock), this and also edge cases where the lock's ephemeral node
  223. is gone.
  224. """
  225. node_sequence = node[len(self.prefix) :]
  226. children = self.client.get_children(self.path)
  227. found_self = False
  228. # Filter out the contenders using the computed regex
  229. contender_matches = []
  230. for child in children:
  231. match = self._contenders_re.search(child)
  232. if match is not None:
  233. contender_sequence = match.group(1)
  234. # Only consider contenders with a smaller sequence number.
  235. # A contender with a smaller sequence number has a higher
  236. # priority.
  237. if contender_sequence < node_sequence:
  238. contender_matches.append(match)
  239. if child == node:
  240. # Remember the node's match object so we can short circuit
  241. # below.
  242. found_self = match
  243. if found_self is False: # pragma: nocover
  244. # somehow we aren't in the childrens -- probably we are
  245. # recovering from a session failure and our ephemeral
  246. # node was removed.
  247. raise ForceRetryError()
  248. if not contender_matches:
  249. return None
  250. # Sort the contenders using the sequence number extracted by the regex
  251. # and return the original string of the predecessor.
  252. sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
  253. return sorted_matches[-1].string
  254. def _find_node(self):
  255. children = self.client.get_children(self.path)
  256. for child in children:
  257. if child.startswith(self.prefix):
  258. return child
  259. return None
  260. def _delete_node(self, node):
  261. self.client.delete(self.path + "/" + node)
  262. def _best_effort_cleanup(self):
  263. try:
  264. node = self.node or self._find_node()
  265. if node:
  266. self._delete_node(node)
  267. except KazooException: # pragma: nocover
  268. pass
  269. def release(self):
  270. """Release the lock immediately."""
  271. return self.client.retry(self._inner_release)
  272. def _inner_release(self):
  273. if not self.is_acquired:
  274. return False
  275. try:
  276. self._delete_node(self.node)
  277. except NoNodeError: # pragma: nocover
  278. pass
  279. self.is_acquired = False
  280. self.node = None
  281. return True
  282. def contenders(self):
  283. """Return an ordered list of the current contenders for the
  284. lock.
  285. .. note::
  286. If the contenders did not set an identifier, it will appear
  287. as a blank string.
  288. """
  289. # make sure our election parent node exists
  290. if not self.assured_path:
  291. self._ensure_path()
  292. children = self.client.get_children(self.path)
  293. # We want all contenders, including self (this is especially important
  294. # for r/w locks). This is similar to the logic of `_get_predecessor`
  295. # except we include our own pattern.
  296. all_contenders_re = re.compile(
  297. r"(?:{patterns})(-?\d{{10}})$".format(
  298. patterns="|".join(self._exclude_names | {self._NODE_NAME})
  299. )
  300. )
  301. # Filter out the contenders using the computed regex
  302. contender_matches = []
  303. for child in children:
  304. match = all_contenders_re.search(child)
  305. if match is not None:
  306. contender_matches.append(match)
  307. # Sort the contenders using the sequence number extracted by the regex,
  308. # then extract the original string.
  309. contender_nodes = [
  310. match.string
  311. for match in sorted(contender_matches, key=lambda m: m.groups())
  312. ]
  313. # Retrieve all the contender nodes data (preserving order).
  314. contenders = []
  315. for node in contender_nodes:
  316. try:
  317. data, stat = self.client.get(self.path + "/" + node)
  318. if data is not None:
  319. contenders.append(data.decode("utf-8"))
  320. except NoNodeError: # pragma: nocover
  321. pass
  322. return contenders
  323. def __enter__(self):
  324. self.acquire()
  325. def __exit__(self, exc_type, exc_value, traceback):
  326. self.release()
  327. class WriteLock(Lock):
  328. """Kazoo Write Lock
  329. Example usage with a :class:`~kazoo.client.KazooClient` instance:
  330. .. code-block:: python
  331. zk = KazooClient()
  332. zk.start()
  333. lock = zk.WriteLock("/lockpath", "my-identifier")
  334. with lock: # blocks waiting for lock acquisition
  335. # do something with the lock
  336. The lock path passed to WriteLock and ReadLock must match for them to
  337. communicate. The write lock can not be acquired if it is held by
  338. any readers or writers.
  339. Note: This lock is not *re-entrant*. Repeated calls after already
  340. acquired will block.
  341. This is the write-side of a shared lock. See :class:`Lock` for a
  342. standard exclusive lock and :class:`ReadLock` for the read-side of a
  343. shared lock.
  344. """
  345. _NODE_NAME = "__lock__"
  346. _EXCLUDE_NAMES = ["__lock__", "__rlock__"]
  347. class ReadLock(Lock):
  348. """Kazoo Read Lock
  349. Example usage with a :class:`~kazoo.client.KazooClient` instance:
  350. .. code-block:: python
  351. zk = KazooClient()
  352. zk.start()
  353. lock = zk.ReadLock("/lockpath", "my-identifier")
  354. with lock: # blocks waiting for outstanding writers
  355. # do something with the lock
  356. The lock path passed to WriteLock and ReadLock must match for them to
  357. communicate. The read lock blocks if it is held by any writers,
  358. but multiple readers may hold the lock.
  359. Note: This lock is not *re-entrant*. Repeated calls after already
  360. acquired will block.
  361. This is the read-side of a shared lock. See :class:`Lock` for a
  362. standard exclusive lock and :class:`WriteLock` for the write-side of a
  363. shared lock.
  364. """
  365. _NODE_NAME = "__rlock__"
  366. _EXCLUDE_NAMES = ["__lock__"]
  367. class Semaphore(object):
  368. """A Zookeeper-based Semaphore
  369. This synchronization primitive operates in the same manner as the
  370. Python threading version only uses the concept of leases to
  371. indicate how many available leases are available for the lock
  372. rather than counting.
  373. Note: This lock is not meant to be *re-entrant*.
  374. Example:
  375. .. code-block:: python
  376. zk = KazooClient()
  377. semaphore = zk.Semaphore("/leasepath", "my-identifier")
  378. with semaphore: # blocks waiting for lock acquisition
  379. # do something with the semaphore
  380. .. warning::
  381. This class stores the allowed max_leases as the data on the
  382. top-level semaphore node. The stored value is checked once
  383. against the max_leases of each instance. This check is
  384. performed when acquire is called the first time. The semaphore
  385. node needs to be deleted to change the allowed leases.
  386. .. versionadded:: 0.6
  387. The Semaphore class.
  388. .. versionadded:: 1.1
  389. The max_leases check.
  390. """
  391. def __init__(self, client, path, identifier=None, max_leases=1):
  392. """Create a Kazoo Lock
  393. :param client: A :class:`~kazoo.client.KazooClient` instance.
  394. :param path: The semaphore path to use.
  395. :param identifier: Name to use for this lock contender. This
  396. can be useful for querying to see who the
  397. current lock contenders are.
  398. :param max_leases: The maximum amount of leases available for
  399. the semaphore.
  400. """
  401. # Implementation notes about how excessive thundering herd
  402. # and watches are avoided
  403. # - A node (lease pool) holds children for each lease in use
  404. # - A lock is acquired for a process attempting to acquire a
  405. # lease. If a lease is available, the ephemeral node is
  406. # created in the lease pool and the lock is released.
  407. # - Only the lock holder watches for children changes in the
  408. # lease pool
  409. self.client = client
  410. self.path = path
  411. # some data is written to the node. this can be queried via
  412. # contenders() to see who is contending for the lock
  413. self.data = str(identifier or "").encode("utf-8")
  414. self.max_leases = max_leases
  415. self.wake_event = client.handler.event_object()
  416. self.create_path = self.path + "/" + uuid.uuid4().hex
  417. self.lock_path = path + "-" + "__lock__"
  418. self.is_acquired = False
  419. self.assured_path = False
  420. self.cancelled = False
  421. self._session_expired = False
  422. def _ensure_path(self):
  423. result = self.client.ensure_path(self.path)
  424. self.assured_path = True
  425. if result is True:
  426. # node did already exist
  427. data, _ = self.client.get(self.path)
  428. try:
  429. leases = int(data.decode("utf-8"))
  430. except (ValueError, TypeError):
  431. # ignore non-numeric data, maybe the node data is used
  432. # for other purposes
  433. pass
  434. else:
  435. if leases != self.max_leases:
  436. raise ValueError(
  437. "Inconsistent max leases: %s, expected: %s"
  438. % (leases, self.max_leases)
  439. )
  440. else:
  441. self.client.set(self.path, str(self.max_leases).encode("utf-8"))
  442. def cancel(self):
  443. """Cancel a pending semaphore acquire."""
  444. self.cancelled = True
  445. self.wake_event.set()
  446. def acquire(self, blocking=True, timeout=None):
  447. """Acquire the semaphore. By defaults blocks and waits forever.
  448. :param blocking: Block until semaphore is obtained or
  449. return immediately.
  450. :type blocking: bool
  451. :param timeout: Don't wait forever to acquire the semaphore.
  452. :type timeout: float or None
  453. :returns: Was the semaphore acquired?
  454. :rtype: bool
  455. :raises:
  456. ValueError if the max_leases value doesn't match the
  457. stored value.
  458. :exc:`~kazoo.exceptions.LockTimeout` if the semaphore
  459. wasn't acquired within `timeout` seconds.
  460. .. versionadded:: 1.1
  461. The blocking, timeout arguments and the max_leases check.
  462. """
  463. # If the semaphore had previously been canceled, make sure to
  464. # reset that state.
  465. self.cancelled = False
  466. try:
  467. self.is_acquired = self.client.retry(
  468. self._inner_acquire, blocking=blocking, timeout=timeout
  469. )
  470. except KazooException:
  471. # if we did ultimately fail, attempt to clean up
  472. self._best_effort_cleanup()
  473. self.cancelled = False
  474. raise
  475. return self.is_acquired
  476. def _inner_acquire(self, blocking, timeout=None):
  477. """Inner loop that runs from the top anytime a command hits a
  478. retryable Zookeeper exception."""
  479. self._session_expired = False
  480. self.client.add_listener(self._watch_session)
  481. if not self.assured_path:
  482. self._ensure_path()
  483. # Do we already have a lease?
  484. if self.client.exists(self.create_path):
  485. return True
  486. w = _Watch(duration=timeout)
  487. w.start()
  488. lock = self.client.Lock(self.lock_path, self.data)
  489. try:
  490. gotten = lock.acquire(blocking=blocking, timeout=w.leftover())
  491. if not gotten:
  492. return False
  493. while True:
  494. self.wake_event.clear()
  495. # Attempt to grab our lease...
  496. if self._get_lease():
  497. return True
  498. if blocking:
  499. # If blocking, wait until self._watch_lease_change() is
  500. # called before returning
  501. self.wake_event.wait(w.leftover())
  502. if not self.wake_event.is_set():
  503. raise LockTimeout(
  504. "Failed to acquire semaphore on %s"
  505. " after %s seconds" % (self.path, timeout)
  506. )
  507. else:
  508. return False
  509. finally:
  510. lock.release()
  511. def _watch_lease_change(self, event):
  512. self.wake_event.set()
  513. def _get_lease(self, data=None):
  514. # Make sure the session is still valid
  515. if self._session_expired:
  516. raise ForceRetryError("Retry on session loss at top")
  517. # Make sure that the request hasn't been canceled
  518. if self.cancelled:
  519. raise CancelledError("Semaphore cancelled")
  520. # Get a list of the current potential lock holders. If they change,
  521. # notify our wake_event object. This is used to unblock a blocking
  522. # self._inner_acquire call.
  523. children = self.client.get_children(
  524. self.path, self._watch_lease_change
  525. )
  526. # If there are leases available, acquire one
  527. if len(children) < self.max_leases:
  528. self.client.create(self.create_path, self.data, ephemeral=True)
  529. # Check if our acquisition was successful or not. Update our state.
  530. if self.client.exists(self.create_path):
  531. self.is_acquired = True
  532. else:
  533. self.is_acquired = False
  534. # Return current state
  535. return self.is_acquired
  536. def _watch_session(self, state):
  537. if state == KazooState.LOST:
  538. self._session_expired = True
  539. self.wake_event.set()
  540. # Return true to de-register
  541. return True
  542. def _best_effort_cleanup(self):
  543. try:
  544. self.client.delete(self.create_path)
  545. except KazooException: # pragma: nocover
  546. pass
  547. def release(self):
  548. """Release the lease immediately."""
  549. return self.client.retry(self._inner_release)
  550. def _inner_release(self):
  551. if not self.is_acquired:
  552. return False
  553. try:
  554. self.client.delete(self.create_path)
  555. except NoNodeError: # pragma: nocover
  556. pass
  557. self.is_acquired = False
  558. return True
  559. def lease_holders(self):
  560. """Return an unordered list of the current lease holders.
  561. .. note::
  562. If the lease holder did not set an identifier, it will
  563. appear as a blank string.
  564. """
  565. if not self.client.exists(self.path):
  566. return []
  567. children = self.client.get_children(self.path)
  568. lease_holders = []
  569. for child in children:
  570. try:
  571. data, stat = self.client.get(self.path + "/" + child)
  572. lease_holders.append(data.decode("utf-8"))
  573. except NoNodeError: # pragma: nocover
  574. pass
  575. return lease_holders
  576. def __enter__(self):
  577. self.acquire()
  578. def __exit__(self, exc_type, exc_value, traceback):
  579. self.release()