图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

299 lines
9.4 KiB

  1. """Kazoo testing harnesses"""
  2. import atexit
  3. import logging
  4. import os
  5. import uuid
  6. import unittest
  7. from kazoo.client import KazooClient
  8. from kazoo.exceptions import KazooException
  9. from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
  10. from kazoo.protocol.states import KazooState
  11. from kazoo.testing.common import ZookeeperCluster
  12. log = logging.getLogger(__name__)
  13. CLUSTER = None
  14. CLUSTER_CONF = None
  15. CLUSTER_DEFAULTS = {
  16. "ZOOKEEPER_PORT_OFFSET": 20000,
  17. "ZOOKEEPER_CLUSTER_SIZE": 3,
  18. "ZOOKEEPER_OBSERVER_START_ID": -1,
  19. "ZOOKEEPER_LOCAL_SESSION_RO": "false",
  20. }
  21. MAX_INIT_TRIES = 5
  22. def get_global_cluster():
  23. global CLUSTER, CLUSTER_CONF
  24. cluster_conf = {
  25. k: os.environ.get(k, CLUSTER_DEFAULTS.get(k))
  26. for k in [
  27. "ZOOKEEPER_PATH",
  28. "ZOOKEEPER_CLASSPATH",
  29. "ZOOKEEPER_PORT_OFFSET",
  30. "ZOOKEEPER_CLUSTER_SIZE",
  31. "ZOOKEEPER_VERSION",
  32. "ZOOKEEPER_OBSERVER_START_ID",
  33. "ZOOKEEPER_JAAS_AUTH",
  34. "ZOOKEEPER_LOCAL_SESSION_RO",
  35. ]
  36. }
  37. if CLUSTER is not None:
  38. if CLUSTER_CONF == cluster_conf:
  39. return CLUSTER
  40. else:
  41. log.info("Config change detected. Reconfiguring cluster...")
  42. CLUSTER.terminate()
  43. CLUSTER = None
  44. # Create a new cluster
  45. ZK_HOME = cluster_conf.get("ZOOKEEPER_PATH")
  46. ZK_CLASSPATH = cluster_conf.get("ZOOKEEPER_CLASSPATH")
  47. ZK_PORT_OFFSET = int(cluster_conf.get("ZOOKEEPER_PORT_OFFSET"))
  48. ZK_CLUSTER_SIZE = int(cluster_conf.get("ZOOKEEPER_CLUSTER_SIZE"))
  49. ZK_VERSION = cluster_conf.get("ZOOKEEPER_VERSION")
  50. if "-" in ZK_VERSION:
  51. # Ignore pre-release markers like -alpha
  52. ZK_VERSION = ZK_VERSION.split("-")[0]
  53. ZK_VERSION = tuple([int(n) for n in ZK_VERSION.split(".")])
  54. ZK_OBSERVER_START_ID = int(cluster_conf.get("ZOOKEEPER_OBSERVER_START_ID"))
  55. assert ZK_HOME or ZK_CLASSPATH or ZK_VERSION, (
  56. "Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH or "
  57. "ZOOKEEPER_VERSION environment variable must be defined.\n"
  58. "For deb package installations this is /usr/share/java"
  59. )
  60. if ZK_VERSION >= (3, 5):
  61. ZOOKEEPER_LOCAL_SESSION_RO = cluster_conf.get(
  62. "ZOOKEEPER_LOCAL_SESSION_RO"
  63. )
  64. additional_configuration_entries = [
  65. "4lw.commands.whitelist=*",
  66. "reconfigEnabled=true",
  67. # required to avoid session validation error
  68. # in read only test
  69. "localSessionsEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
  70. "localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
  71. ]
  72. # If defined, this sets the superuser password to "test"
  73. additional_java_system_properties = [
  74. "-Dzookeeper.DigestAuthenticationProvider.superDigest="
  75. "super:D/InIHSb7yEEbrWz8b9l71RjZJU="
  76. ]
  77. else:
  78. additional_configuration_entries = []
  79. additional_java_system_properties = []
  80. ZOOKEEPER_JAAS_AUTH = cluster_conf.get("ZOOKEEPER_JAAS_AUTH")
  81. if ZOOKEEPER_JAAS_AUTH == "digest":
  82. jaas_config = """
  83. Server {
  84. org.apache.zookeeper.server.auth.DigestLoginModule required
  85. user_super="super_secret"
  86. user_jaasuser="jaas_password";
  87. };"""
  88. elif ZOOKEEPER_JAAS_AUTH == "gssapi":
  89. # Configure Zookeeper to use our test KDC.
  90. additional_java_system_properties += [
  91. "-Djava.security.krb5.conf=%s"
  92. % os.path.expandvars("${KRB5_CONFIG}"),
  93. "-Dsun.security.krb5.debug=true",
  94. ]
  95. jaas_config = """
  96. Server {
  97. com.sun.security.auth.module.Krb5LoginModule required
  98. debug=true
  99. isInitiator=false
  100. useKeyTab=true
  101. keyTab="%s"
  102. storeKey=true
  103. useTicketCache=false
  104. principal="zookeeper/127.0.0.1@KAZOOTEST.ORG";
  105. };""" % os.path.expandvars(
  106. "${KRB5_TEST_ENV}/server.keytab"
  107. )
  108. else:
  109. jaas_config = None
  110. CLUSTER = ZookeeperCluster(
  111. install_path=ZK_HOME,
  112. classpath=ZK_CLASSPATH,
  113. port_offset=ZK_PORT_OFFSET,
  114. size=ZK_CLUSTER_SIZE,
  115. observer_start_id=ZK_OBSERVER_START_ID,
  116. configuration_entries=additional_configuration_entries,
  117. java_system_properties=additional_java_system_properties,
  118. jaas_config=jaas_config,
  119. )
  120. CLUSTER_CONF = cluster_conf
  121. atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
  122. return CLUSTER
  123. class KazooTestHarness(unittest.TestCase):
  124. """Harness for testing code that uses Kazoo
  125. This object can be used directly or as a mixin. It supports starting
  126. and stopping a complete ZooKeeper cluster locally and provides an
  127. API for simulating errors and expiring sessions.
  128. Example::
  129. class MyTestCase(KazooTestHarness):
  130. def setUp(self):
  131. self.setup_zookeeper()
  132. # additional test setup
  133. def tearDown(self):
  134. self.teardown_zookeeper()
  135. def test_something(self):
  136. something_that_needs_a_kazoo_client(self.client)
  137. def test_something_else(self):
  138. something_that_needs_zk_servers(self.servers)
  139. """
  140. DEFAULT_CLIENT_TIMEOUT = 15
  141. def __init__(self, *args, **kw):
  142. super(KazooTestHarness, self).__init__(*args, **kw)
  143. self.client = None
  144. self._clients = []
  145. @property
  146. def cluster(self):
  147. return get_global_cluster()
  148. def log(self, level, msg, *args, **kwargs):
  149. log.log(level, msg, *args, **kwargs)
  150. @property
  151. def servers(self):
  152. return ",".join([s.address for s in self.cluster])
  153. @property
  154. def secure_servers(self):
  155. return ",".join([s.secure_address for s in self.cluster])
  156. def _get_nonchroot_client(self):
  157. c = KazooClient(self.servers)
  158. self._clients.append(c)
  159. return c
  160. def _get_client(self, **client_options):
  161. if "timeout" not in client_options:
  162. client_options["timeout"] = self.DEFAULT_CLIENT_TIMEOUT
  163. c = KazooClient(self.hosts, **client_options)
  164. self._clients.append(c)
  165. return c
  166. def lose_connection(self, event_factory):
  167. """Force client to lose connection with server"""
  168. self.__break_connection(
  169. _CONNECTION_DROP, KazooState.SUSPENDED, event_factory
  170. )
  171. def expire_session(self, event_factory):
  172. """Force ZK to expire a client session"""
  173. self.__break_connection(
  174. _SESSION_EXPIRED, KazooState.LOST, event_factory
  175. )
  176. def setup_zookeeper(self, **client_options):
  177. """Create a ZK cluster and chrooted :class:`KazooClient`
  178. The cluster will only be created on the first invocation and won't be
  179. fully torn down until exit.
  180. """
  181. do_start = False
  182. for s in self.cluster:
  183. if not s.running:
  184. do_start = True
  185. if do_start:
  186. self.cluster.start()
  187. namespace = "/kazootests" + uuid.uuid4().hex
  188. self.hosts = self.servers + namespace
  189. tries = 0
  190. while True:
  191. try:
  192. client_cluster_health = self._get_client()
  193. client_cluster_health.start()
  194. client_cluster_health.ensure_path("/")
  195. client_cluster_health.stop()
  196. self.log(logging.INFO, "cluster looks ready to go")
  197. break
  198. except Exception:
  199. tries += 1
  200. if tries >= MAX_INIT_TRIES:
  201. raise
  202. if tries > 0 and tries % 2 == 0:
  203. self.log(
  204. logging.WARNING,
  205. "nuking current cluster and making another one",
  206. )
  207. self.cluster.terminate()
  208. self.cluster.start()
  209. continue
  210. if client_options.get("use_ssl"):
  211. self.hosts = self.secure_servers + namespace
  212. else:
  213. self.hosts = self.servers + namespace
  214. self.client = self._get_client(**client_options)
  215. self.client.start()
  216. self.client.ensure_path("/")
  217. def teardown_zookeeper(self):
  218. """Reset and cleanup the zookeeper cluster that was started."""
  219. while self._clients:
  220. c = self._clients.pop()
  221. try:
  222. c.stop()
  223. except KazooException:
  224. log.exception("Failed stopping client %s", c)
  225. finally:
  226. c.close()
  227. self.client = None
  228. def __break_connection(self, break_event, expected_state, event_factory):
  229. """Break ZooKeeper connection using the specified event."""
  230. lost = event_factory()
  231. safe = event_factory()
  232. def watch_loss(state):
  233. if state == expected_state:
  234. lost.set()
  235. elif lost.is_set() and state == KazooState.CONNECTED:
  236. safe.set()
  237. return True
  238. self.client.add_listener(watch_loss)
  239. self.client._call(break_event, None)
  240. lost.wait(5)
  241. if not lost.is_set():
  242. raise Exception("Failed to get notified of broken connection.")
  243. safe.wait(15)
  244. if not safe.is_set():
  245. raise Exception("Failed to see client reconnect.")
  246. self.client.retry(self.client.get_async, "/")
  247. class KazooTestCase(KazooTestHarness):
  248. def setUp(self):
  249. self.setup_zookeeper()
  250. def tearDown(self):
  251. self.teardown_zookeeper()
  252. @classmethod
  253. def tearDownClass(cls):
  254. cluster = get_global_cluster()
  255. if cluster is not None:
  256. cluster.terminate()