图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
4.6 KiB

  1. import uuid
  2. import sys
  3. import threading
  4. import pytest
  5. from kazoo.testing import KazooTestCase
  6. from kazoo.tests.util import wait
  7. class UniqueError(Exception):
  8. """Error raised only by test leader function"""
  9. class KazooElectionTests(KazooTestCase):
  10. def setUp(self):
  11. super(KazooElectionTests, self).setUp()
  12. self.path = "/" + uuid.uuid4().hex
  13. self.condition = threading.Condition()
  14. # election contenders set these when elected. The exit event is set by
  15. # the test to make the leader exit.
  16. self.leader_id = None
  17. self.exit_event = None
  18. # tests set this before the event to make the leader raise an error
  19. self.raise_exception = False
  20. # set by a worker thread when an unexpected error is hit.
  21. # better way to do this?
  22. self.thread_exc_info = None
  23. def _spawn_contender(self, contender_id, election):
  24. thread = threading.Thread(
  25. target=self._election_thread, args=(contender_id, election)
  26. )
  27. thread.daemon = True
  28. thread.start()
  29. return thread
  30. def _election_thread(self, contender_id, election):
  31. try:
  32. election.run(self._leader_func, contender_id)
  33. except UniqueError:
  34. if not self.raise_exception:
  35. self.thread_exc_info = sys.exc_info()
  36. except Exception:
  37. self.thread_exc_info = sys.exc_info()
  38. else:
  39. if self.raise_exception:
  40. e = Exception("expected leader func to raise exception")
  41. self.thread_exc_info = (Exception, e, None)
  42. def _leader_func(self, name):
  43. exit_event = threading.Event()
  44. with self.condition:
  45. self.exit_event = exit_event
  46. self.leader_id = name
  47. self.condition.notify_all()
  48. exit_event.wait(45)
  49. if self.raise_exception:
  50. raise UniqueError("expected error in the leader function")
  51. def _check_thread_error(self):
  52. if self.thread_exc_info:
  53. t, o, tb = self.thread_exc_info
  54. raise t(o)
  55. def test_election(self):
  56. elections = {}
  57. threads = {}
  58. for _ in range(3):
  59. contender = "c" + uuid.uuid4().hex
  60. elections[contender] = self.client.Election(self.path, contender)
  61. threads[contender] = self._spawn_contender(
  62. contender, elections[contender]
  63. )
  64. # wait for a leader to be elected
  65. times = 0
  66. with self.condition:
  67. while not self.leader_id:
  68. self.condition.wait(5)
  69. times += 1
  70. if times > 5:
  71. raise Exception(
  72. "Still not a leader: lid: %s", self.leader_id
  73. )
  74. election = self.client.Election(self.path)
  75. # make sure all contenders are in the pool
  76. wait(lambda: len(election.contenders()) == len(elections))
  77. contenders = election.contenders()
  78. assert set(contenders) == set(elections.keys())
  79. # first one in list should be leader
  80. first_leader = contenders[0]
  81. assert first_leader == self.leader_id
  82. # tell second one to cancel election. should never get elected.
  83. elections[contenders[1]].cancel()
  84. # make leader exit. third contender should be elected.
  85. self.exit_event.set()
  86. with self.condition:
  87. while self.leader_id == first_leader:
  88. self.condition.wait(45)
  89. assert self.leader_id == contenders[2]
  90. self._check_thread_error()
  91. # make first contender re-enter the race
  92. threads[first_leader].join()
  93. threads[first_leader] = self._spawn_contender(
  94. first_leader, elections[first_leader]
  95. )
  96. # contender set should now be the current leader plus the first leader
  97. wait(lambda: len(election.contenders()) == 2)
  98. contenders = election.contenders()
  99. assert set(contenders), set([self.leader_id == first_leader])
  100. # make current leader raise an exception. first should be reelected
  101. self.raise_exception = True
  102. self.exit_event.set()
  103. with self.condition:
  104. while self.leader_id != first_leader:
  105. self.condition.wait(45)
  106. assert self.leader_id == first_leader
  107. self._check_thread_error()
  108. self.exit_event.set()
  109. for thread in threads.values():
  110. thread.join()
  111. self._check_thread_error()
  112. def test_bad_func(self):
  113. election = self.client.Election(self.path)
  114. with pytest.raises(ValueError):
  115. election.run("not a callable")