图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
5.5 KiB

  1. """A eventlet based handler."""
  2. from __future__ import absolute_import
  3. import atexit
  4. import contextlib
  5. import logging
  6. import eventlet
  7. from eventlet.green import socket as green_socket
  8. from eventlet.green import time as green_time
  9. from eventlet.green import threading as green_threading
  10. from eventlet.green import selectors as green_selectors
  11. from eventlet import queue as green_queue
  12. from kazoo.handlers import utils
  13. from kazoo.handlers.utils import selector_select
  14. LOG = logging.getLogger(__name__)
  15. # sentinel objects
  16. _STOP = object()
  17. @contextlib.contextmanager
  18. def _yield_before_after():
  19. # Yield to any other co-routines...
  20. #
  21. # See: http://eventlet.net/doc/modules/greenthread.html
  22. # for how this zero sleep is really a cooperative yield to other potential
  23. # co-routines...
  24. eventlet.sleep(0)
  25. try:
  26. yield
  27. finally:
  28. eventlet.sleep(0)
  29. class TimeoutError(Exception):
  30. pass
  31. class AsyncResult(utils.AsyncResult):
  32. """A one-time event that stores a value or an exception"""
  33. def __init__(self, handler):
  34. super(AsyncResult, self).__init__(
  35. handler, green_threading.Condition, TimeoutError
  36. )
  37. class SequentialEventletHandler(object):
  38. """Eventlet handler for sequentially executing callbacks.
  39. This handler executes callbacks in a sequential manner. A queue is
  40. created for each of the callback events, so that each type of event
  41. has its callback type run sequentially. These are split into two
  42. queues, one for watch events and one for async result completion
  43. callbacks.
  44. Each queue type has a greenthread worker that pulls the callback event
  45. off the queue and runs it in the order the client sees it.
  46. This split helps ensure that watch callbacks won't block session
  47. re-establishment should the connection be lost during a Zookeeper
  48. client call.
  49. Watch and completion callbacks should avoid blocking behavior as
  50. the next callback of that type won't be run until it completes. If
  51. you need to block, spawn a new greenthread and return immediately so
  52. callbacks can proceed.
  53. .. note::
  54. Completion callbacks can block to wait on Zookeeper calls, but
  55. no other completion callbacks will execute until the callback
  56. returns.
  57. """
  58. name = "sequential_eventlet_handler"
  59. queue_impl = green_queue.LightQueue
  60. queue_empty = green_queue.Empty
  61. def __init__(self):
  62. """Create a :class:`SequentialEventletHandler` instance"""
  63. self.callback_queue = self.queue_impl()
  64. self.completion_queue = self.queue_impl()
  65. self._workers = []
  66. self._started = False
  67. @staticmethod
  68. def sleep_func(wait):
  69. green_time.sleep(wait)
  70. @property
  71. def running(self):
  72. return self._started
  73. timeout_exception = TimeoutError
  74. def _process_completion_queue(self):
  75. while True:
  76. cb = self.completion_queue.get()
  77. if cb is _STOP:
  78. break
  79. try:
  80. with _yield_before_after():
  81. cb()
  82. except Exception:
  83. LOG.warning(
  84. "Exception in worker completion queue greenlet",
  85. exc_info=True,
  86. )
  87. finally:
  88. del cb # release before possible idle
  89. def _process_callback_queue(self):
  90. while True:
  91. cb = self.callback_queue.get()
  92. if cb is _STOP:
  93. break
  94. try:
  95. with _yield_before_after():
  96. cb()
  97. except Exception:
  98. LOG.warning(
  99. "Exception in worker callback queue greenlet",
  100. exc_info=True,
  101. )
  102. finally:
  103. del cb # release before possible idle
  104. def start(self):
  105. if not self._started:
  106. # Spawn our worker threads, we have
  107. # - A callback worker for watch events to be called
  108. # - A completion worker for completion events to be called
  109. w = eventlet.spawn(self._process_completion_queue)
  110. self._workers.append((w, self.completion_queue))
  111. w = eventlet.spawn(self._process_callback_queue)
  112. self._workers.append((w, self.callback_queue))
  113. self._started = True
  114. atexit.register(self.stop)
  115. def stop(self):
  116. while self._workers:
  117. w, q = self._workers.pop()
  118. q.put(_STOP)
  119. w.wait()
  120. self._started = False
  121. atexit.unregister(self.stop)
  122. def socket(self, *args, **kwargs):
  123. return utils.create_tcp_socket(green_socket)
  124. def create_socket_pair(self):
  125. return utils.create_socket_pair(green_socket)
  126. def event_object(self):
  127. return green_threading.Event()
  128. def lock_object(self):
  129. return green_threading.Lock()
  130. def rlock_object(self):
  131. return green_threading.RLock()
  132. def create_connection(self, *args, **kwargs):
  133. return utils.create_tcp_connection(green_socket, *args, **kwargs)
  134. def select(self, *args, **kwargs):
  135. with _yield_before_after():
  136. return selector_select(
  137. *args, selectors_module=green_selectors, **kwargs
  138. )
  139. def async_result(self):
  140. return AsyncResult(self)
  141. def spawn(self, func, *args, **kwargs):
  142. t = green_threading.Thread(target=func, args=args, kwargs=kwargs)
  143. t.daemon = True
  144. t.start()
  145. return t
  146. def dispatch_callback(self, callback):
  147. self.callback_queue.put(lambda: callback.func(*callback.args))