图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
12 KiB

  1. """Zookeeper based queue implementations.
  2. :Maintainer: None
  3. :Status: Possibly Buggy
  4. .. note::
  5. This queue was reported to cause memory leaks over long running periods.
  6. See: https://github.com/python-zk/kazoo/issues/175
  7. """
  8. import uuid
  9. from kazoo.exceptions import NoNodeError, NodeExistsError
  10. from kazoo.protocol.states import EventType
  11. from kazoo.retry import ForceRetryError
  12. class BaseQueue(object):
  13. """A common base class for queue implementations."""
  14. def __init__(self, client, path):
  15. """
  16. :param client: A :class:`~kazoo.client.KazooClient` instance.
  17. :param path: The queue path to use in ZooKeeper.
  18. """
  19. self.client = client
  20. self.path = path
  21. self._entries_path = path
  22. self.structure_paths = (self.path,)
  23. self.ensured_path = False
  24. def _check_put_arguments(self, value, priority=100):
  25. if not isinstance(value, bytes):
  26. raise TypeError("value must be a byte string")
  27. if not isinstance(priority, int):
  28. raise TypeError("priority must be an int")
  29. elif priority < 0 or priority > 999:
  30. raise ValueError("priority must be between 0 and 999")
  31. def _ensure_paths(self):
  32. if not self.ensured_path:
  33. # make sure our parent / internal structure nodes exists
  34. for path in self.structure_paths:
  35. self.client.ensure_path(path)
  36. self.ensured_path = True
  37. def __len__(self):
  38. self._ensure_paths()
  39. _, stat = self.client.retry(self.client.get, self._entries_path)
  40. return stat.children_count
  41. class Queue(BaseQueue):
  42. """A distributed queue with optional priority support.
  43. This queue does not offer reliable consumption. An entry is removed
  44. from the queue prior to being processed. So if an error occurs, the
  45. consumer has to re-queue the item or it will be lost.
  46. """
  47. prefix = "entry-"
  48. def __init__(self, client, path):
  49. """
  50. :param client: A :class:`~kazoo.client.KazooClient` instance.
  51. :param path: The queue path to use in ZooKeeper.
  52. """
  53. super(Queue, self).__init__(client, path)
  54. self._children = []
  55. def __len__(self):
  56. """Return queue size."""
  57. return super(Queue, self).__len__()
  58. def get(self):
  59. """
  60. Get item data and remove an item from the queue.
  61. :returns: Item data or None.
  62. :rtype: bytes
  63. """
  64. self._ensure_paths()
  65. return self.client.retry(self._inner_get)
  66. def _inner_get(self):
  67. if not self._children:
  68. self._children = self.client.retry(
  69. self.client.get_children, self.path
  70. )
  71. self._children = sorted(self._children)
  72. if not self._children:
  73. return None
  74. name = self._children[0]
  75. try:
  76. data, stat = self.client.get(self.path + "/" + name)
  77. self.client.delete(self.path + "/" + name)
  78. except NoNodeError: # pragma: nocover
  79. # the first node has vanished in the meantime, try to
  80. # get another one
  81. self._children = []
  82. raise ForceRetryError()
  83. self._children.pop(0)
  84. return data
  85. def put(self, value, priority=100):
  86. """Put an item into the queue.
  87. :param value: Byte string to put into the queue.
  88. :param priority:
  89. An optional priority as an integer with at most 3 digits.
  90. Lower values signify higher priority.
  91. """
  92. self._check_put_arguments(value, priority)
  93. self._ensure_paths()
  94. path = "{path}/{prefix}{priority:03d}-".format(
  95. path=self.path, prefix=self.prefix, priority=priority
  96. )
  97. self.client.create(path, value, sequence=True)
  98. class LockingQueue(BaseQueue):
  99. """A distributed queue with priority and locking support.
  100. Upon retrieving an entry from the queue, the entry gets locked with an
  101. ephemeral node (instead of deleted). If an error occurs, this lock gets
  102. released so that others could retake the entry. This adds a little penalty
  103. as compared to :class:`Queue` implementation.
  104. The user should call the :meth:`LockingQueue.get` method first to lock and
  105. retrieve the next entry. When finished processing the entry, a user should
  106. call the :meth:`LockingQueue.consume` method that will remove the entry
  107. from the queue.
  108. This queue will not track connection status with ZooKeeper. If a node locks
  109. an element, then loses connection with ZooKeeper and later reconnects, the
  110. lock will probably be removed by Zookeeper in the meantime, but a node
  111. would still think that it holds a lock. The user should check the
  112. connection status with Zookeeper or call :meth:`LockingQueue.holds_lock`
  113. method that will check if a node still holds the lock.
  114. .. note::
  115. :class:`LockingQueue` requires ZooKeeper 3.4 or above, since it is
  116. using transactions.
  117. """
  118. lock = "/taken"
  119. entries = "/entries"
  120. entry = "entry"
  121. def __init__(self, client, path):
  122. """
  123. :param client: A :class:`~kazoo.client.KazooClient` instance.
  124. :param path: The queue path to use in ZooKeeper.
  125. """
  126. super(LockingQueue, self).__init__(client, path)
  127. self.id = uuid.uuid4().hex.encode()
  128. self.processing_element = None
  129. self._lock_path = self.path + self.lock
  130. self._entries_path = self.path + self.entries
  131. self.structure_paths = (self._lock_path, self._entries_path)
  132. def __len__(self):
  133. """Returns the current length of the queue.
  134. :returns: queue size (includes locked entries count).
  135. """
  136. return super(LockingQueue, self).__len__()
  137. def put(self, value, priority=100):
  138. """Put an entry into the queue.
  139. :param value: Byte string to put into the queue.
  140. :param priority:
  141. An optional priority as an integer with at most 3 digits.
  142. Lower values signify higher priority.
  143. """
  144. self._check_put_arguments(value, priority)
  145. self._ensure_paths()
  146. self.client.create(
  147. "{path}/{prefix}-{priority:03d}-".format(
  148. path=self._entries_path, prefix=self.entry, priority=priority
  149. ),
  150. value,
  151. sequence=True,
  152. )
  153. def put_all(self, values, priority=100):
  154. """Put several entries into the queue. The action only succeeds
  155. if all entries where put into the queue.
  156. :param values: A list of values to put into the queue.
  157. :param priority:
  158. An optional priority as an integer with at most 3 digits.
  159. Lower values signify higher priority.
  160. """
  161. if not isinstance(values, list):
  162. raise TypeError("values must be a list of byte strings")
  163. if not isinstance(priority, int):
  164. raise TypeError("priority must be an int")
  165. elif priority < 0 or priority > 999:
  166. raise ValueError("priority must be between 0 and 999")
  167. self._ensure_paths()
  168. with self.client.transaction() as transaction:
  169. for value in values:
  170. if not isinstance(value, bytes):
  171. raise TypeError("value must be a byte string")
  172. transaction.create(
  173. "{path}/{prefix}-{priority:03d}-".format(
  174. path=self._entries_path,
  175. prefix=self.entry,
  176. priority=priority,
  177. ),
  178. value,
  179. sequence=True,
  180. )
  181. def get(self, timeout=None):
  182. """Locks and gets an entry from the queue. If a previously got entry
  183. was not consumed, this method will return that entry.
  184. :param timeout:
  185. Maximum waiting time in seconds. If None then it will wait
  186. until an entry appears in the queue.
  187. :returns: A locked entry value or None if the timeout was reached.
  188. :rtype: bytes
  189. """
  190. self._ensure_paths()
  191. if self.processing_element is not None:
  192. return self.processing_element[1]
  193. else:
  194. return self._inner_get(timeout)
  195. def holds_lock(self):
  196. """Checks if a node still holds the lock.
  197. :returns: True if a node still holds the lock, False otherwise.
  198. :rtype: bool
  199. """
  200. if self.processing_element is None:
  201. return False
  202. lock_id, _ = self.processing_element
  203. lock_path = "{path}/{id}".format(path=self._lock_path, id=lock_id)
  204. self.client.sync(lock_path)
  205. value, stat = self.client.retry(self.client.get, lock_path)
  206. return value == self.id
  207. def consume(self):
  208. """Removes a currently processing entry from the queue.
  209. :returns: True if element was removed successfully, False otherwise.
  210. :rtype: bool
  211. """
  212. if self.processing_element is not None and self.holds_lock():
  213. id_, value = self.processing_element
  214. with self.client.transaction() as transaction:
  215. transaction.delete(
  216. "{path}/{id}".format(path=self._entries_path, id=id_)
  217. )
  218. transaction.delete(
  219. "{path}/{id}".format(path=self._lock_path, id=id_)
  220. )
  221. self.processing_element = None
  222. return True
  223. else:
  224. return False
  225. def release(self):
  226. """Removes the lock from currently processed item without consuming it.
  227. :returns: True if the lock was removed successfully, False otherwise.
  228. :rtype: bool
  229. """
  230. if self.processing_element is not None and self.holds_lock():
  231. id_, value = self.processing_element
  232. with self.client.transaction() as transaction:
  233. transaction.delete(
  234. "{path}/{id}".format(path=self._lock_path, id=id_)
  235. )
  236. self.processing_element = None
  237. return True
  238. else:
  239. return False
  240. def _inner_get(self, timeout):
  241. flag = self.client.handler.event_object()
  242. lock = self.client.handler.lock_object()
  243. canceled = False
  244. value = []
  245. def check_for_updates(event):
  246. if event is not None and event.type != EventType.CHILD:
  247. return
  248. with lock:
  249. if canceled or flag.is_set():
  250. return
  251. values = self.client.retry(
  252. self.client.get_children,
  253. self._entries_path,
  254. check_for_updates,
  255. )
  256. taken = self.client.retry(
  257. self.client.get_children,
  258. self._lock_path,
  259. check_for_updates,
  260. )
  261. available = self._filter_locked(values, taken)
  262. if len(available) > 0:
  263. ret = self._take(available[0])
  264. if ret is not None:
  265. # By this time, no one took the task
  266. value.append(ret)
  267. flag.set()
  268. check_for_updates(None)
  269. retVal = None
  270. flag.wait(timeout)
  271. with lock:
  272. canceled = True
  273. if len(value) > 0:
  274. # We successfully locked an entry
  275. self.processing_element = value[0]
  276. retVal = value[0][1]
  277. return retVal
  278. def _filter_locked(self, values, taken):
  279. taken = set(taken)
  280. available = sorted(values)
  281. return (
  282. available
  283. if len(taken) == 0
  284. else [x for x in available if x not in taken]
  285. )
  286. def _take(self, id_):
  287. try:
  288. self.client.create(
  289. "{path}/{id}".format(path=self._lock_path, id=id_),
  290. self.id,
  291. ephemeral=True,
  292. )
  293. except NodeExistsError:
  294. # Item is already locked
  295. return None
  296. try:
  297. value, stat = self.client.retry(
  298. self.client.get,
  299. "{path}/{id}".format(path=self._entries_path, id=id_),
  300. )
  301. except NoNodeError:
  302. # Item is already consumed
  303. self.client.delete(
  304. "{path}/{id}".format(path=self._lock_path, id=id_)
  305. )
  306. return None
  307. return (id_, value)