You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
import uuid import sys import threading
import pytest
from kazoo.testing import KazooTestCase from kazoo.tests.util import wait
class UniqueError(Exception): """Error raised only by test leader function"""
class KazooElectionTests(KazooTestCase): def setUp(self): super(KazooElectionTests, self).setUp() self.path = "/" + uuid.uuid4().hex
self.condition = threading.Condition()
# election contenders set these when elected. The exit event is set by # the test to make the leader exit. self.leader_id = None self.exit_event = None
# tests set this before the event to make the leader raise an error self.raise_exception = False
# set by a worker thread when an unexpected error is hit. # better way to do this? self.thread_exc_info = None
def _spawn_contender(self, contender_id, election): thread = threading.Thread( target=self._election_thread, args=(contender_id, election) ) thread.daemon = True thread.start() return thread
def _election_thread(self, contender_id, election): try: election.run(self._leader_func, contender_id) except UniqueError: if not self.raise_exception: self.thread_exc_info = sys.exc_info() except Exception: self.thread_exc_info = sys.exc_info() else: if self.raise_exception: e = Exception("expected leader func to raise exception") self.thread_exc_info = (Exception, e, None)
def _leader_func(self, name): exit_event = threading.Event() with self.condition: self.exit_event = exit_event self.leader_id = name self.condition.notify_all()
exit_event.wait(45) if self.raise_exception: raise UniqueError("expected error in the leader function")
def _check_thread_error(self): if self.thread_exc_info: t, o, tb = self.thread_exc_info raise t(o)
def test_election(self): elections = {} threads = {} for _ in range(3): contender = "c" + uuid.uuid4().hex elections[contender] = self.client.Election(self.path, contender) threads[contender] = self._spawn_contender( contender, elections[contender] )
# wait for a leader to be elected times = 0 with self.condition: while not self.leader_id: self.condition.wait(5) times += 1 if times > 5: raise Exception( "Still not a leader: lid: %s", self.leader_id )
election = self.client.Election(self.path)
# make sure all contenders are in the pool wait(lambda: len(election.contenders()) == len(elections)) contenders = election.contenders()
assert set(contenders) == set(elections.keys())
# first one in list should be leader first_leader = contenders[0] assert first_leader == self.leader_id
# tell second one to cancel election. should never get elected. elections[contenders[1]].cancel()
# make leader exit. third contender should be elected. self.exit_event.set() with self.condition: while self.leader_id == first_leader: self.condition.wait(45) assert self.leader_id == contenders[2] self._check_thread_error()
# make first contender re-enter the race threads[first_leader].join() threads[first_leader] = self._spawn_contender( first_leader, elections[first_leader] )
# contender set should now be the current leader plus the first leader wait(lambda: len(election.contenders()) == 2) contenders = election.contenders() assert set(contenders), set([self.leader_id == first_leader])
# make current leader raise an exception. first should be reelected self.raise_exception = True self.exit_event.set() with self.condition: while self.leader_id != first_leader: self.condition.wait(45) assert self.leader_id == first_leader self._check_thread_error()
self.exit_event.set() for thread in threads.values(): thread.join() self._check_thread_error()
def test_bad_func(self): election = self.client.Election(self.path) with pytest.raises(ValueError): election.run("not a callable")
|