图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

443 lines
14 KiB

  1. """Zookeeper Partitioner Implementation
  2. :Maintainer: None
  3. :Status: Unknown
  4. :class:`SetPartitioner` implements a partitioning scheme using
  5. Zookeeper for dividing up resources amongst members of a party.
  6. This is useful when there is a set of resources that should only be
  7. accessed by a single process at a time that multiple processes
  8. across a cluster might want to divide up.
  9. Example Use-Case
  10. ----------------
  11. - Multiple workers across a cluster need to divide up a list of queues
  12. so that no two workers own the same queue.
  13. """
  14. from functools import partial
  15. import logging
  16. import os
  17. import socket
  18. from kazoo.exceptions import KazooException, LockTimeout
  19. from kazoo.protocol.states import KazooState
  20. from kazoo.recipe.watchers import PatientChildrenWatch
  21. log = logging.getLogger(__name__)
  22. class PartitionState(object):
  23. """High level partition state values
  24. .. attribute:: ALLOCATING
  25. The set needs to be partitioned, and may require an existing
  26. partition set to be released before acquiring a new partition
  27. of the set.
  28. .. attribute:: ACQUIRED
  29. The set has been partitioned and acquired.
  30. .. attribute:: RELEASE
  31. The set needs to be repartitioned, and the current partitions
  32. must be released before a new allocation can be made.
  33. .. attribute:: FAILURE
  34. The set partition has failed. This occurs when the maximum
  35. time to partition the set is exceeded or the Zookeeper session
  36. is lost. The partitioner is unusable after this state and must
  37. be recreated.
  38. """
  39. ALLOCATING = "ALLOCATING"
  40. ACQUIRED = "ACQUIRED"
  41. RELEASE = "RELEASE"
  42. FAILURE = "FAILURE"
  43. class SetPartitioner(object):
  44. """Partitions a set amongst members of a party
  45. This class will partition a set amongst members of a party such
  46. that each member will be given zero or more items of the set and
  47. each set item will be given to a single member. When new members
  48. enter or leave the party, the set will be re-partitioned amongst
  49. the members.
  50. When the :class:`SetPartitioner` enters the
  51. :attr:`~PartitionState.FAILURE` state, it is unrecoverable
  52. and a new :class:`SetPartitioner` should be created.
  53. Example:
  54. .. code-block:: python
  55. from kazoo.client import KazooClient
  56. client = KazooClient()
  57. client.start()
  58. qp = client.SetPartitioner(
  59. path='/work_queues', set=('queue-1', 'queue-2', 'queue-3'))
  60. while 1:
  61. if qp.failed:
  62. raise Exception("Lost or unable to acquire partition")
  63. elif qp.release:
  64. qp.release_set()
  65. elif qp.acquired:
  66. for partition in qp:
  67. # Do something with each partition
  68. elif qp.allocating:
  69. qp.wait_for_acquire()
  70. **State Transitions**
  71. When created, the :class:`SetPartitioner` enters the
  72. :attr:`PartitionState.ALLOCATING` state.
  73. :attr:`~PartitionState.ALLOCATING` ->
  74. :attr:`~PartitionState.ACQUIRED`
  75. Set was partitioned successfully, the partition list assigned
  76. is accessible via list/iter methods or calling list() on the
  77. :class:`SetPartitioner` instance.
  78. :attr:`~PartitionState.ALLOCATING` ->
  79. :attr:`~PartitionState.FAILURE`
  80. Allocating the set failed either due to a Zookeeper session
  81. expiration, or failure to acquire the items of the set within
  82. the timeout period.
  83. :attr:`~PartitionState.ACQUIRED` ->
  84. :attr:`~PartitionState.RELEASE`
  85. The members of the party have changed, and the set needs to be
  86. repartitioned. :meth:`SetPartitioner.release` should be called
  87. as soon as possible.
  88. :attr:`~PartitionState.ACQUIRED` ->
  89. :attr:`~PartitionState.FAILURE`
  90. The current partition was lost due to a Zookeeper session
  91. expiration.
  92. :attr:`~PartitionState.RELEASE` ->
  93. :attr:`~PartitionState.ALLOCATING`
  94. The current partition was released and is being re-allocated.
  95. """
  96. def __init__(
  97. self,
  98. client,
  99. path,
  100. set,
  101. partition_func=None,
  102. identifier=None,
  103. time_boundary=30,
  104. max_reaction_time=1,
  105. state_change_event=None,
  106. ):
  107. """Create a :class:`~SetPartitioner` instance
  108. :param client: A :class:`~kazoo.client.KazooClient` instance.
  109. :param path: The partition path to use.
  110. :param set: The set of items to partition.
  111. :param partition_func: A function to use to decide how to
  112. partition the set.
  113. :param identifier: An identifier to use for this member of the
  114. party when participating. Defaults to the
  115. hostname + process id.
  116. :param time_boundary: How long the party members must be stable
  117. before allocation can complete.
  118. :param max_reaction_time: Maximum reaction time for party members
  119. change.
  120. :param state_change_event: An optional Event object that will be set
  121. on every state change.
  122. """
  123. # Used to differentiate two states with the same names in time
  124. self.state_id = 0
  125. self.state = PartitionState.ALLOCATING
  126. self.state_change_event = (
  127. state_change_event or client.handler.event_object()
  128. )
  129. self._client = client
  130. self._path = path
  131. self._set = set
  132. self._partition_set = []
  133. self._partition_func = partition_func or self._partitioner
  134. self._identifier = identifier or "%s-%s" % (
  135. socket.getfqdn(),
  136. os.getpid(),
  137. )
  138. self._locks = []
  139. self._lock_path = "/".join([path, "locks"])
  140. self._party_path = "/".join([path, "party"])
  141. self._time_boundary = time_boundary
  142. self._max_reaction_time = max_reaction_time
  143. self._acquire_event = client.handler.event_object()
  144. # Create basic path nodes
  145. client.ensure_path(path)
  146. client.ensure_path(self._lock_path)
  147. client.ensure_path(self._party_path)
  148. # Join the party
  149. self._party = client.ShallowParty(
  150. self._party_path, identifier=self._identifier
  151. )
  152. self._party.join()
  153. self._state_change = client.handler.rlock_object()
  154. client.add_listener(self._establish_sessionwatch)
  155. # Now watch the party and set the callback on the async result
  156. # so we know when we're ready
  157. self._child_watching(self._allocate_transition, client_handler=True)
  158. def __iter__(self):
  159. """Return the partitions in this partition set"""
  160. for partition in self._partition_set:
  161. yield partition
  162. @property
  163. def failed(self):
  164. """Corresponds to the :attr:`PartitionState.FAILURE` state"""
  165. return self.state == PartitionState.FAILURE
  166. @property
  167. def release(self):
  168. """Corresponds to the :attr:`PartitionState.RELEASE` state"""
  169. return self.state == PartitionState.RELEASE
  170. @property
  171. def allocating(self):
  172. """Corresponds to the :attr:`PartitionState.ALLOCATING`
  173. state"""
  174. return self.state == PartitionState.ALLOCATING
  175. @property
  176. def acquired(self):
  177. """Corresponds to the :attr:`PartitionState.ACQUIRED` state"""
  178. return self.state == PartitionState.ACQUIRED
  179. def wait_for_acquire(self, timeout=30):
  180. """Wait for the set to be partitioned and acquired
  181. :param timeout: How long to wait before returning.
  182. :type timeout: int
  183. """
  184. self._acquire_event.wait(timeout)
  185. def release_set(self):
  186. """Call to release the set
  187. This method begins the step of allocating once the set has
  188. been released.
  189. """
  190. self._release_locks()
  191. if self._locks: # pragma: nocover
  192. # This shouldn't happen, it means we couldn't release our
  193. # locks, abort
  194. self._fail_out()
  195. return
  196. else:
  197. with self._state_change:
  198. if self.failed:
  199. return
  200. self._set_state(PartitionState.ALLOCATING)
  201. self._child_watching(self._allocate_transition, client_handler=True)
  202. def finish(self):
  203. """Call to release the set and leave the party"""
  204. self._release_locks()
  205. self._fail_out()
  206. def _fail_out(self):
  207. with self._state_change:
  208. self._set_state(PartitionState.FAILURE)
  209. if self._party.participating:
  210. try:
  211. self._party.leave()
  212. except KazooException: # pragma: nocover
  213. pass
  214. def _allocate_transition(self, result):
  215. """Called when in allocating mode, and the children settled"""
  216. # Did we get an exception waiting for children to settle?
  217. if result.exception: # pragma: nocover
  218. self._fail_out()
  219. return
  220. children, async_result = result.get()
  221. children_changed = self._client.handler.event_object()
  222. def updated(result):
  223. with self._state_change:
  224. children_changed.set()
  225. if self.acquired:
  226. self._set_state(PartitionState.RELEASE)
  227. with self._state_change:
  228. # We can lose connection during processing the event
  229. if not self.allocating:
  230. return
  231. # Remember the state ID to check later for race conditions
  232. state_id = self.state_id
  233. # updated() will be called when children change
  234. async_result.rawlink(updated)
  235. # Check whether the state has changed during the lock acquisition
  236. # and abort the process if so.
  237. def abort_if_needed():
  238. if self.state_id == state_id:
  239. if children_changed.is_set():
  240. # The party has changed. Repartitioning...
  241. self._abort_lock_acquisition()
  242. return True
  243. else:
  244. return False
  245. else:
  246. if self.allocating or self.acquired:
  247. # The connection was lost and user initiated a new
  248. # allocation process. Abort it to eliminate race
  249. # conditions with locks.
  250. with self._state_change:
  251. self._set_state(PartitionState.RELEASE)
  252. return True
  253. # Split up the set
  254. partition_set = self._partition_func(
  255. self._identifier, list(self._party), self._set
  256. )
  257. # Proceed to acquire locks for the working set as needed
  258. for member in partition_set:
  259. lock = self._client.Lock(self._lock_path + "/" + str(member))
  260. while True:
  261. try:
  262. # We mustn't lock without timeout because in that case we
  263. # can get a deadlock if the party state will change during
  264. # lock acquisition.
  265. lock.acquire(timeout=self._max_reaction_time)
  266. except LockTimeout:
  267. if abort_if_needed():
  268. return
  269. except KazooException:
  270. return self.finish()
  271. else:
  272. break
  273. self._locks.append(lock)
  274. if abort_if_needed():
  275. return
  276. # All locks acquired. Time for state transition.
  277. with self._state_change:
  278. if self.state_id == state_id and not children_changed.is_set():
  279. self._partition_set = partition_set
  280. self._set_state(PartitionState.ACQUIRED)
  281. self._acquire_event.set()
  282. return
  283. if not abort_if_needed():
  284. # This mustn't happen. Means a logical error.
  285. self._fail_out()
  286. def _release_locks(self):
  287. """Attempt to completely remove all the locks"""
  288. self._acquire_event.clear()
  289. for lock in self._locks[:]:
  290. try:
  291. lock.release()
  292. except KazooException: # pragma: nocover
  293. # We proceed to remove as many as possible, and leave
  294. # the ones we couldn't remove
  295. pass
  296. else:
  297. self._locks.remove(lock)
  298. def _abort_lock_acquisition(self):
  299. """Called during lock acquisition if a party change occurs"""
  300. self._release_locks()
  301. if self._locks:
  302. # This shouldn't happen, it means we couldn't release our
  303. # locks, abort
  304. self._fail_out()
  305. return
  306. self._child_watching(self._allocate_transition, client_handler=True)
  307. def _child_watching(self, func=None, client_handler=False):
  308. """Called when children are being watched to stabilize
  309. This actually returns immediately, child watcher spins up a
  310. new thread/greenlet and waits for it to stabilize before
  311. any callbacks might run.
  312. :param client_handler: If True, deliver the result using the
  313. client's event handler.
  314. """
  315. watcher = PatientChildrenWatch(
  316. self._client, self._party_path, self._time_boundary
  317. )
  318. asy = watcher.start()
  319. if func is not None:
  320. # We spin up the function in a separate thread/greenlet
  321. # to ensure that the rawlink's it might use won't be
  322. # blocked
  323. if client_handler:
  324. func = partial(self._client.handler.spawn, func)
  325. asy.rawlink(func)
  326. return asy
  327. def _establish_sessionwatch(self, state):
  328. """Register ourself to listen for session events, we shut down
  329. if we become lost"""
  330. with self._state_change:
  331. if self.failed:
  332. pass
  333. elif state == KazooState.LOST:
  334. self._client.handler.spawn(self._fail_out)
  335. elif not self.release:
  336. self._set_state(PartitionState.RELEASE)
  337. return state == KazooState.LOST
  338. def _partitioner(self, identifier, members, partitions):
  339. # Ensure consistent order of partitions/members
  340. all_partitions = sorted(partitions)
  341. workers = sorted(members)
  342. i = workers.index(identifier)
  343. # Now return the partition list starting at our location and
  344. # skipping the other workers
  345. return all_partitions[i :: len(workers)]
  346. def _set_state(self, state):
  347. self.state = state
  348. self.state_id += 1
  349. self.state_change_event.set()