图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
5.6 KiB

  1. import os
  2. import subprocess
  3. import time
  4. import pytest
  5. from kazoo.testing import KazooTestHarness
  6. from kazoo.exceptions import (
  7. AuthFailedError,
  8. NoAuthError,
  9. )
  10. from kazoo.tests.util import CI_ZK_VERSION
  11. class TestLegacySASLDigestAuthentication(KazooTestHarness):
  12. def setUp(self):
  13. try:
  14. import puresasl # NOQA
  15. except ImportError:
  16. pytest.skip("PureSASL not available.")
  17. os.environ["ZOOKEEPER_JAAS_AUTH"] = "digest"
  18. self.setup_zookeeper()
  19. if CI_ZK_VERSION:
  20. version = CI_ZK_VERSION
  21. else:
  22. version = self.client.server_version()
  23. if not version or version < (3, 4):
  24. pytest.skip("Must use Zookeeper 3.4 or above")
  25. def tearDown(self):
  26. self.teardown_zookeeper()
  27. os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
  28. def test_connect_sasl_auth(self):
  29. from kazoo.security import make_acl
  30. username = "jaasuser"
  31. password = "jaas_password"
  32. acl = make_acl("sasl", credential=username, all=True)
  33. sasl_auth = "%s:%s" % (username, password)
  34. client = self._get_client(auth_data=[("sasl", sasl_auth)])
  35. client.start()
  36. try:
  37. client.create("/1", acl=(acl,))
  38. # give ZK a chance to copy data to other node
  39. time.sleep(0.1)
  40. with pytest.raises(NoAuthError):
  41. self.client.get("/1")
  42. finally:
  43. client.delete("/1")
  44. client.stop()
  45. client.close()
  46. def test_invalid_sasl_auth(self):
  47. client = self._get_client(auth_data=[("sasl", "baduser:badpassword")])
  48. with pytest.raises(AuthFailedError):
  49. client.start()
  50. class TestSASLDigestAuthentication(KazooTestHarness):
  51. def setUp(self):
  52. try:
  53. import puresasl # NOQA
  54. except ImportError:
  55. pytest.skip("PureSASL not available.")
  56. os.environ["ZOOKEEPER_JAAS_AUTH"] = "digest"
  57. self.setup_zookeeper()
  58. if CI_ZK_VERSION:
  59. version = CI_ZK_VERSION
  60. else:
  61. version = self.client.server_version()
  62. if not version or version < (3, 4):
  63. pytest.skip("Must use Zookeeper 3.4 or above")
  64. def tearDown(self):
  65. self.teardown_zookeeper()
  66. os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
  67. def test_connect_sasl_auth(self):
  68. from kazoo.security import make_acl
  69. username = "jaasuser"
  70. password = "jaas_password"
  71. acl = make_acl("sasl", credential=username, all=True)
  72. client = self._get_client(
  73. sasl_options={
  74. "mechanism": "DIGEST-MD5",
  75. "username": username,
  76. "password": password,
  77. }
  78. )
  79. client.start()
  80. try:
  81. client.create("/1", acl=(acl,))
  82. # give ZK a chance to copy data to other node
  83. time.sleep(0.1)
  84. with pytest.raises(NoAuthError):
  85. self.client.get("/1")
  86. finally:
  87. client.delete("/1")
  88. client.stop()
  89. client.close()
  90. def test_invalid_sasl_auth(self):
  91. client = self._get_client(
  92. sasl_options={
  93. "mechanism": "DIGEST-MD5",
  94. "username": "baduser",
  95. "password": "badpassword",
  96. }
  97. )
  98. with pytest.raises(AuthFailedError):
  99. client.start()
  100. class TestSASLGSSAPIAuthentication(KazooTestHarness):
  101. def setUp(self):
  102. try:
  103. import puresasl # NOQA
  104. except ImportError:
  105. pytest.skip("PureSASL not available.")
  106. try:
  107. import kerberos # NOQA
  108. except ImportError:
  109. pytest.skip("Kerberos support not available.")
  110. if not os.environ.get("KRB5_TEST_ENV"):
  111. pytest.skip("Test Kerberos environ not setup.")
  112. os.environ["ZOOKEEPER_JAAS_AUTH"] = "gssapi"
  113. self.setup_zookeeper()
  114. if CI_ZK_VERSION:
  115. version = CI_ZK_VERSION
  116. else:
  117. version = self.client.server_version()
  118. if not version or version < (3, 4):
  119. pytest.skip("Must use Zookeeper 3.4 or above")
  120. def tearDown(self):
  121. self.teardown_zookeeper()
  122. os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
  123. def test_connect_gssapi_auth(self):
  124. from kazoo.security import make_acl
  125. # Ensure we have a client ticket
  126. subprocess.check_call(
  127. [
  128. "kinit",
  129. "-kt",
  130. os.path.expandvars("${KRB5_TEST_ENV}/client.keytab"),
  131. "client",
  132. ]
  133. )
  134. acl = make_acl("sasl", credential="client@KAZOOTEST.ORG", all=True)
  135. client = self._get_client(sasl_options={"mechanism": "GSSAPI"})
  136. client.start()
  137. try:
  138. client.create("/1", acl=(acl,))
  139. # give ZK a chance to copy data to other node
  140. time.sleep(0.1)
  141. with pytest.raises(NoAuthError):
  142. self.client.get("/1")
  143. finally:
  144. client.delete("/1")
  145. client.stop()
  146. client.close()
  147. def test_invalid_gssapi_auth(self):
  148. # Request a post-datated ticket, so that it is currently invalid.
  149. subprocess.check_call(
  150. [
  151. "kinit",
  152. "-kt",
  153. os.path.expandvars("${KRB5_TEST_ENV}/client.keytab"),
  154. "-s",
  155. "30min",
  156. "client",
  157. ]
  158. )
  159. client = self._get_client(sasl_options={"mechanism": "GSSAPI"})
  160. with pytest.raises(AuthFailedError):
  161. client.start()