图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
5.1 KiB

  1. """A gevent based handler."""
  2. from __future__ import absolute_import
  3. import atexit
  4. import logging
  5. import gevent
  6. from gevent import socket
  7. import gevent.event
  8. import gevent.queue
  9. import gevent.select
  10. import gevent.thread
  11. import gevent.selectors
  12. from kazoo.handlers.utils import selector_select
  13. from gevent.lock import Semaphore, RLock
  14. from kazoo.handlers import utils
  15. _using_libevent = gevent.__version__.startswith("0.")
  16. log = logging.getLogger(__name__)
  17. _STOP = object()
  18. AsyncResult = gevent.event.AsyncResult
  19. class SequentialGeventHandler(object):
  20. """Gevent handler for sequentially executing callbacks.
  21. This handler executes callbacks in a sequential manner. A queue is
  22. created for each of the callback events, so that each type of event
  23. has its callback type run sequentially.
  24. Each queue type has a greenlet worker that pulls the callback event
  25. off the queue and runs it in the order the client sees it.
  26. This split helps ensure that watch callbacks won't block session
  27. re-establishment should the connection be lost during a Zookeeper
  28. client call.
  29. Watch callbacks should avoid blocking behavior as the next callback
  30. of that type won't be run until it completes. If you need to block,
  31. spawn a new greenlet and return immediately so callbacks can
  32. proceed.
  33. """
  34. name = "sequential_gevent_handler"
  35. queue_impl = gevent.queue.Queue
  36. queue_empty = gevent.queue.Empty
  37. sleep_func = staticmethod(gevent.sleep)
  38. def __init__(self):
  39. """Create a :class:`SequentialGeventHandler` instance"""
  40. self.callback_queue = self.queue_impl()
  41. self._running = False
  42. self._async = None
  43. self._state_change = Semaphore()
  44. self._workers = []
  45. @property
  46. def running(self):
  47. return self._running
  48. class timeout_exception(gevent.Timeout):
  49. def __init__(self, msg):
  50. gevent.Timeout.__init__(self, exception=msg)
  51. def _create_greenlet_worker(self, queue):
  52. def greenlet_worker():
  53. while True:
  54. try:
  55. func = queue.get()
  56. try:
  57. if func is _STOP:
  58. break
  59. func()
  60. except Exception as exc:
  61. log.warning("Exception in worker greenlet")
  62. log.exception(exc)
  63. finally:
  64. del func # release before possible idle
  65. except self.queue_empty:
  66. continue
  67. return gevent.spawn(greenlet_worker)
  68. def start(self):
  69. """Start the greenlet workers."""
  70. with self._state_change:
  71. if self._running:
  72. return
  73. self._running = True
  74. # Spawn our worker greenlets, we have
  75. # - A callback worker for watch events to be called
  76. for queue in (self.callback_queue,):
  77. w = self._create_greenlet_worker(queue)
  78. self._workers.append(w)
  79. atexit.register(self.stop)
  80. def stop(self):
  81. """Stop the greenlet workers and empty all queues."""
  82. with self._state_change:
  83. if not self._running:
  84. return
  85. self._running = False
  86. for queue in (self.callback_queue,):
  87. queue.put(_STOP)
  88. while self._workers:
  89. worker = self._workers.pop()
  90. worker.join()
  91. # Clear the queues
  92. self.callback_queue = self.queue_impl() # pragma: nocover
  93. atexit.unregister(self.stop)
  94. def select(self, *args, **kwargs):
  95. return selector_select(
  96. *args, selectors_module=gevent.selectors, **kwargs
  97. )
  98. def socket(self, *args, **kwargs):
  99. return utils.create_tcp_socket(socket)
  100. def create_connection(self, *args, **kwargs):
  101. return utils.create_tcp_connection(socket, *args, **kwargs)
  102. def create_socket_pair(self):
  103. return utils.create_socket_pair(socket)
  104. def event_object(self):
  105. """Create an appropriate Event object"""
  106. return gevent.event.Event()
  107. def lock_object(self):
  108. """Create an appropriate Lock object"""
  109. return gevent.thread.allocate_lock()
  110. def rlock_object(self):
  111. """Create an appropriate RLock object"""
  112. return RLock()
  113. def async_result(self):
  114. """Create a :class:`AsyncResult` instance
  115. The :class:`AsyncResult` instance will have its completion
  116. callbacks executed in the thread the
  117. :class:`SequentialGeventHandler` is created in (which should be
  118. the gevent/main thread).
  119. """
  120. return AsyncResult()
  121. def spawn(self, func, *args, **kwargs):
  122. """Spawn a function to run asynchronously"""
  123. return gevent.spawn(func, *args, **kwargs)
  124. def dispatch_callback(self, callback):
  125. """Dispatch to the callback object
  126. The callback is put on separate queues to run depending on the
  127. type as documented for the :class:`SequentialGeventHandler`.
  128. """
  129. self.callback_queue.put(lambda: callback.func(*callback.args))