You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
544 lines
14 KiB
544 lines
14 KiB
import time
|
|
import threading
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from kazoo.exceptions import KazooException
|
|
from kazoo.protocol.states import EventType
|
|
from kazoo.testing import KazooTestCase
|
|
|
|
|
|
class KazooDataWatcherTests(KazooTestCase):
|
|
def setUp(self):
|
|
super(KazooDataWatcherTests, self).setUp()
|
|
self.path = "/" + uuid.uuid4().hex
|
|
self.client.ensure_path(self.path)
|
|
|
|
def test_data_watcher(self):
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
# Make it a non-existent path
|
|
self.path += "f"
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(d, stat):
|
|
data.pop()
|
|
data.append(d)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert data == [None]
|
|
update.clear()
|
|
|
|
self.client.create(self.path, b"fred")
|
|
update.wait(10)
|
|
assert data[0] == b"fred"
|
|
update.clear()
|
|
|
|
def test_data_watcher_once(self):
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
# Make it a non-existent path
|
|
self.path += "f"
|
|
|
|
dwatcher = self.client.DataWatch(self.path)
|
|
|
|
@dwatcher
|
|
def changed(d, stat):
|
|
data.pop()
|
|
data.append(d)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert data == [None]
|
|
update.clear()
|
|
|
|
with pytest.raises(KazooException):
|
|
|
|
@dwatcher
|
|
def func(d, stat):
|
|
data.pop()
|
|
|
|
def test_data_watcher_with_event(self):
|
|
# Test that the data watcher gets passed the event, if it
|
|
# accepts three arguments
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
# Make it a non-existent path
|
|
self.path += "f"
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(d, stat, event):
|
|
data.pop()
|
|
data.append(event)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert data == [None]
|
|
update.clear()
|
|
|
|
self.client.create(self.path, b"fred")
|
|
update.wait(10)
|
|
assert data[0].type == EventType.CREATED
|
|
update.clear()
|
|
|
|
def test_func_style_data_watch(self):
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
# Make it a non-existent path
|
|
path = self.path + "f"
|
|
|
|
def changed(d, stat):
|
|
data.pop()
|
|
data.append(d)
|
|
update.set()
|
|
|
|
self.client.DataWatch(path, changed)
|
|
|
|
update.wait(10)
|
|
assert data == [None]
|
|
update.clear()
|
|
|
|
self.client.create(path, b"fred")
|
|
update.wait(10)
|
|
assert data[0] == b"fred"
|
|
update.clear()
|
|
|
|
def test_datawatch_across_session_expire(self):
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(d, stat):
|
|
data.pop()
|
|
data.append(d)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert data == [b""]
|
|
update.clear()
|
|
|
|
self.expire_session(threading.Event)
|
|
self.client.retry(self.client.set, self.path, b"fred")
|
|
update.wait(25)
|
|
assert data[0] == b"fred"
|
|
|
|
def test_func_stops(self):
|
|
update = threading.Event()
|
|
data = [True]
|
|
|
|
self.path += "f"
|
|
|
|
fail_through = []
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(d, stat):
|
|
data.pop()
|
|
data.append(d)
|
|
update.set()
|
|
if fail_through:
|
|
return False
|
|
|
|
update.wait(10)
|
|
assert data == [None]
|
|
update.clear()
|
|
|
|
fail_through.append(True)
|
|
self.client.create(self.path, b"fred")
|
|
update.wait(10)
|
|
assert data[0] == b"fred"
|
|
update.clear()
|
|
|
|
self.client.set(self.path, b"asdfasdf")
|
|
update.wait(0.2)
|
|
assert data[0] == b"fred"
|
|
|
|
d, stat = self.client.get(self.path)
|
|
assert d == b"asdfasdf"
|
|
|
|
def test_no_such_node(self):
|
|
args = []
|
|
|
|
@self.client.DataWatch("/some/path")
|
|
def changed(d, stat):
|
|
args.extend([d, stat])
|
|
|
|
assert args == [None, None]
|
|
|
|
def test_no_such_node_for_children_watch(self):
|
|
args = []
|
|
path = self.path + "/test_no_such_node_for_children_watch"
|
|
update = threading.Event()
|
|
|
|
def changed(children):
|
|
args.append(children)
|
|
update.set()
|
|
|
|
# watch a node which does not exist
|
|
children_watch = self.client.ChildrenWatch(path, changed)
|
|
assert update.is_set() is False
|
|
assert children_watch._stopped is True
|
|
assert args == []
|
|
|
|
# watch a node which exists
|
|
self.client.create(path, b"")
|
|
children_watch = self.client.ChildrenWatch(path, changed)
|
|
update.wait(3)
|
|
assert args == [[]]
|
|
update.clear()
|
|
|
|
# watch changes
|
|
self.client.create(path + "/fred", b"")
|
|
update.wait(3)
|
|
assert args == [[], ["fred"]]
|
|
update.clear()
|
|
|
|
# delete children
|
|
self.client.delete(path + "/fred")
|
|
update.wait(3)
|
|
assert args == [[], ["fred"], []]
|
|
update.clear()
|
|
|
|
# delete watching
|
|
self.client.delete(path)
|
|
|
|
# a hack for waiting the watcher stop
|
|
for retry in range(5):
|
|
if children_watch._stopped:
|
|
break
|
|
children_watch._run_lock.acquire()
|
|
children_watch._run_lock.release()
|
|
time.sleep(retry / 10.0)
|
|
|
|
assert update.is_set() is False
|
|
assert children_watch._stopped is True
|
|
|
|
def test_watcher_evaluating_to_false(self):
|
|
class WeirdWatcher(list):
|
|
def __call__(self, *args):
|
|
self.called = True
|
|
|
|
watcher = WeirdWatcher()
|
|
self.client.DataWatch(self.path, watcher)
|
|
self.client.set(self.path, b"mwahaha")
|
|
assert watcher.called is True
|
|
|
|
def test_watcher_repeat_delete(self):
|
|
a = []
|
|
ev = threading.Event()
|
|
|
|
self.client.delete(self.path)
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(val, stat):
|
|
a.append(val)
|
|
ev.set()
|
|
|
|
assert a == [None]
|
|
ev.wait(10)
|
|
ev.clear()
|
|
self.client.create(self.path, b"blah")
|
|
ev.wait(10)
|
|
assert ev.is_set() is True
|
|
ev.clear()
|
|
assert a == [None, b"blah"]
|
|
self.client.delete(self.path)
|
|
ev.wait(10)
|
|
assert ev.is_set() is True
|
|
ev.clear()
|
|
assert a == [None, b"blah", None]
|
|
self.client.create(self.path, b"blah")
|
|
ev.wait(10)
|
|
assert ev.is_set() is True
|
|
ev.clear()
|
|
assert a == [None, b"blah", None, b"blah"]
|
|
|
|
def test_watcher_with_closing(self):
|
|
a = []
|
|
ev = threading.Event()
|
|
|
|
self.client.delete(self.path)
|
|
|
|
@self.client.DataWatch(self.path)
|
|
def changed(val, stat):
|
|
a.append(val)
|
|
ev.set()
|
|
|
|
assert a == [None]
|
|
|
|
b = False
|
|
try:
|
|
self.client.stop()
|
|
except: # noqa
|
|
b = True
|
|
assert b is False
|
|
|
|
|
|
class KazooChildrenWatcherTests(KazooTestCase):
|
|
def setUp(self):
|
|
super(KazooChildrenWatcherTests, self).setUp()
|
|
self.path = "/" + uuid.uuid4().hex
|
|
self.client.ensure_path(self.path)
|
|
|
|
def test_child_watcher(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
@self.client.ChildrenWatch(self.path)
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "george")
|
|
update.wait(10)
|
|
assert sorted(all_children) == ["george", "smith"]
|
|
|
|
def test_child_watcher_once(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
cwatch = self.client.ChildrenWatch(self.path)
|
|
|
|
@cwatch
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
with pytest.raises(KazooException):
|
|
|
|
@cwatch
|
|
def changed_again(children):
|
|
update.set()
|
|
|
|
def test_child_watcher_with_event(self):
|
|
update = threading.Event()
|
|
events = [True]
|
|
|
|
@self.client.ChildrenWatch(self.path, send_event=True)
|
|
def changed(children, event):
|
|
events.pop()
|
|
events.append(event)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert events == [None]
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert events[0].type == EventType.CHILD
|
|
update.clear()
|
|
|
|
def test_func_style_child_watcher(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
|
|
self.client.ChildrenWatch(self.path, changed)
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "george")
|
|
update.wait(10)
|
|
assert sorted(all_children) == ["george", "smith"]
|
|
|
|
def test_func_stops(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
fail_through = []
|
|
|
|
@self.client.ChildrenWatch(self.path)
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
if fail_through:
|
|
return False
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
fail_through.append(True)
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "george")
|
|
update.wait(0.5)
|
|
assert all_children == ["smith"]
|
|
|
|
def test_child_watcher_remove_session_watcher(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
fail_through = []
|
|
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
if fail_through:
|
|
return False
|
|
|
|
children_watch = self.client.ChildrenWatch(self.path, changed)
|
|
session_watcher = children_watch._session_watcher
|
|
|
|
update.wait(10)
|
|
assert session_watcher in self.client.state_listeners
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
fail_through.append(True)
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert session_watcher not in self.client.state_listeners
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "george")
|
|
update.wait(10)
|
|
assert session_watcher not in self.client.state_listeners
|
|
assert all_children == ["smith"]
|
|
|
|
def test_child_watch_session_loss(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
@self.client.ChildrenWatch(self.path)
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
self.expire_session(threading.Event)
|
|
|
|
self.client.retry(self.client.create, self.path + "/" + "george")
|
|
update.wait(20)
|
|
assert sorted(all_children) == ["george", "smith"]
|
|
|
|
def test_child_stop_on_session_loss(self):
|
|
update = threading.Event()
|
|
all_children = ["fred"]
|
|
|
|
@self.client.ChildrenWatch(self.path, allow_session_lost=False)
|
|
def changed(children):
|
|
while all_children:
|
|
all_children.pop()
|
|
all_children.extend(children)
|
|
update.set()
|
|
|
|
update.wait(10)
|
|
assert all_children == []
|
|
update.clear()
|
|
|
|
self.client.create(self.path + "/" + "smith")
|
|
update.wait(10)
|
|
assert all_children == ["smith"]
|
|
update.clear()
|
|
self.expire_session(threading.Event)
|
|
|
|
self.client.retry(self.client.create, self.path + "/" + "george")
|
|
update.wait(4)
|
|
assert update.is_set() is False
|
|
assert all_children == ["smith"]
|
|
|
|
children = self.client.get_children(self.path)
|
|
assert sorted(children) == ["george", "smith"]
|
|
|
|
|
|
class KazooPatientChildrenWatcherTests(KazooTestCase):
|
|
def setUp(self):
|
|
super(KazooPatientChildrenWatcherTests, self).setUp()
|
|
self.path = "/" + uuid.uuid4().hex
|
|
|
|
def _makeOne(self, *args, **kwargs):
|
|
from kazoo.recipe.watchers import PatientChildrenWatch
|
|
|
|
return PatientChildrenWatch(*args, **kwargs)
|
|
|
|
def test_watch(self):
|
|
self.client.ensure_path(self.path)
|
|
watcher = self._makeOne(self.client, self.path, 0.1)
|
|
result = watcher.start()
|
|
children, asy = result.get()
|
|
assert len(children) == 0
|
|
assert asy.ready() is False
|
|
|
|
self.client.create(self.path + "/" + "fred")
|
|
asy.get(timeout=1)
|
|
assert asy.ready() is True
|
|
|
|
def test_exception(self):
|
|
from kazoo.exceptions import NoNodeError
|
|
|
|
watcher = self._makeOne(self.client, self.path, 0.1)
|
|
result = watcher.start()
|
|
|
|
with pytest.raises(NoNodeError):
|
|
result.get()
|
|
|
|
def test_watch_iterations(self):
|
|
self.client.ensure_path(self.path)
|
|
watcher = self._makeOne(self.client, self.path, 0.5)
|
|
result = watcher.start()
|
|
assert result.ready() is False
|
|
|
|
time.sleep(0.08)
|
|
self.client.create(self.path + "/" + uuid.uuid4().hex)
|
|
assert result.ready() is False
|
|
time.sleep(0.08)
|
|
assert result.ready() is False
|
|
self.client.create(self.path + "/" + uuid.uuid4().hex)
|
|
time.sleep(0.08)
|
|
assert result.ready() is False
|
|
|
|
children, asy = result.get()
|
|
assert len(children) == 2
|