图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

221 lines
6.3 KiB

  1. """Zookeeper Barriers
  2. :Maintainer: None
  3. :Status: Unknown
  4. """
  5. import os
  6. import socket
  7. import uuid
  8. from kazoo.exceptions import KazooException, NoNodeError, NodeExistsError
  9. from kazoo.protocol.states import EventType
  10. class Barrier(object):
  11. """Kazoo Barrier
  12. Implements a barrier to block processing of a set of nodes until
  13. a condition is met at which point the nodes will be allowed to
  14. proceed. The barrier is in place if its node exists.
  15. .. warning::
  16. The :meth:`wait` function does not handle connection loss and
  17. may raise :exc:`~kazoo.exceptions.ConnectionLossException` if
  18. the connection is lost while waiting.
  19. """
  20. def __init__(self, client, path):
  21. """Create a Kazoo Barrier
  22. :param client: A :class:`~kazoo.client.KazooClient` instance.
  23. :param path: The barrier path to use.
  24. """
  25. self.client = client
  26. self.path = path
  27. def create(self):
  28. """Establish the barrier if it doesn't exist already"""
  29. self.client.retry(self.client.ensure_path, self.path)
  30. def remove(self):
  31. """Remove the barrier
  32. :returns: Whether the barrier actually needed to be removed.
  33. :rtype: bool
  34. """
  35. try:
  36. self.client.retry(self.client.delete, self.path)
  37. return True
  38. except NoNodeError:
  39. return False
  40. def wait(self, timeout=None):
  41. """Wait on the barrier to be cleared
  42. :returns: True if the barrier has been cleared, otherwise
  43. False.
  44. :rtype: bool
  45. """
  46. cleared = self.client.handler.event_object()
  47. def wait_for_clear(event):
  48. if event.type == EventType.DELETED:
  49. cleared.set()
  50. exists = self.client.exists(self.path, watch=wait_for_clear)
  51. if not exists:
  52. return True
  53. cleared.wait(timeout)
  54. return cleared.is_set()
  55. class DoubleBarrier(object):
  56. """Kazoo Double Barrier
  57. Double barriers are used to synchronize the beginning and end of
  58. a distributed task. The barrier blocks when entering it until all
  59. the members have joined, and blocks when leaving until all the
  60. members have left.
  61. .. note::
  62. You should register a listener for session loss as the process
  63. will no longer be part of the barrier once the session is
  64. gone. Connection losses will be retried with the default retry
  65. policy.
  66. """
  67. def __init__(self, client, path, num_clients, identifier=None):
  68. """Create a Double Barrier
  69. :param client: A :class:`~kazoo.client.KazooClient` instance.
  70. :param path: The barrier path to use.
  71. :param num_clients: How many clients must enter the barrier to
  72. proceed.
  73. :type num_clients: int
  74. :param identifier: An identifier to use for this member of the
  75. barrier when participating. Defaults to the
  76. hostname + process id.
  77. """
  78. self.client = client
  79. self.path = path
  80. self.num_clients = num_clients
  81. self._identifier = identifier or "%s-%s" % (
  82. socket.getfqdn(),
  83. os.getpid(),
  84. )
  85. self.participating = False
  86. self.assured_path = False
  87. self.node_name = uuid.uuid4().hex
  88. self.create_path = self.path + "/" + self.node_name
  89. def enter(self):
  90. """Enter the barrier, blocks until all nodes have entered"""
  91. try:
  92. self.client.retry(self._inner_enter)
  93. self.participating = True
  94. except KazooException:
  95. # We failed to enter, best effort cleanup
  96. self._best_effort_cleanup()
  97. self.participating = False
  98. def _inner_enter(self):
  99. # make sure our barrier parent node exists
  100. if not self.assured_path:
  101. self.client.ensure_path(self.path)
  102. self.assured_path = True
  103. ready = self.client.handler.event_object()
  104. try:
  105. self.client.create(
  106. self.create_path,
  107. self._identifier.encode("utf-8"),
  108. ephemeral=True,
  109. )
  110. except NodeExistsError:
  111. pass
  112. def created(event):
  113. if event.type == EventType.CREATED:
  114. ready.set()
  115. self.client.exists(self.path + "/" + "ready", watch=created)
  116. children = self.client.get_children(self.path)
  117. if len(children) < self.num_clients:
  118. ready.wait()
  119. else:
  120. self.client.ensure_path(self.path + "/ready")
  121. return True
  122. def leave(self):
  123. """Leave the barrier, blocks until all nodes have left"""
  124. try:
  125. self.client.retry(self._inner_leave)
  126. except KazooException: # pragma: nocover
  127. # Failed to cleanly leave
  128. self._best_effort_cleanup()
  129. self.participating = False
  130. def _inner_leave(self):
  131. # Delete the ready node if its around
  132. try:
  133. self.client.delete(self.path + "/ready")
  134. except NoNodeError:
  135. pass
  136. while True:
  137. children = self.client.get_children(self.path)
  138. if not children:
  139. return True
  140. if len(children) == 1 and children[0] == self.node_name:
  141. self.client.delete(self.create_path)
  142. return True
  143. children.sort()
  144. ready = self.client.handler.event_object()
  145. def deleted(event):
  146. if event.type == EventType.DELETED:
  147. ready.set()
  148. if self.node_name == children[0]:
  149. # We're first, wait on the highest to leave
  150. if not self.client.exists(
  151. self.path + "/" + children[-1], watch=deleted
  152. ):
  153. continue
  154. ready.wait()
  155. continue
  156. # Delete our node
  157. self.client.delete(self.create_path)
  158. # Wait on the first
  159. if not self.client.exists(
  160. self.path + "/" + children[0], watch=deleted
  161. ):
  162. continue
  163. # Wait for the lowest to be deleted
  164. ready.wait()
  165. def _best_effort_cleanup(self):
  166. try:
  167. self.client.retry(self.client.delete, self.create_path)
  168. except NoNodeError:
  169. pass