You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
821 lines
25 KiB
821 lines
25 KiB
import collections
|
|
import threading
|
|
import unittest
|
|
from unittest.mock import MagicMock
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from kazoo.exceptions import CancelledError
|
|
from kazoo.exceptions import LockTimeout
|
|
from kazoo.exceptions import NoNodeError
|
|
from kazoo.recipe.lock import Lock
|
|
from kazoo.testing import KazooTestCase
|
|
from kazoo.tests import util as test_util
|
|
|
|
|
|
class SleepBarrier(object):
|
|
"""A crappy spinning barrier."""
|
|
|
|
def __init__(self, wait_for, sleep_func):
|
|
self._wait_for = wait_for
|
|
self._arrived = collections.deque()
|
|
self._sleep_func = sleep_func
|
|
|
|
def __enter__(self):
|
|
self._arrived.append(threading.current_thread())
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
try:
|
|
self._arrived.remove(threading.current_thread())
|
|
except ValueError:
|
|
pass
|
|
|
|
def wait(self):
|
|
while len(self._arrived) < self._wait_for:
|
|
self._sleep_func(0.001)
|
|
|
|
|
|
class KazooLockTests(KazooTestCase):
|
|
thread_count = 20
|
|
|
|
def __init__(self, *args, **kw):
|
|
super(KazooLockTests, self).__init__(*args, **kw)
|
|
self.threads_made = []
|
|
|
|
def tearDown(self):
|
|
super(KazooLockTests, self).tearDown()
|
|
while self.threads_made:
|
|
t = self.threads_made.pop()
|
|
t.join()
|
|
|
|
@staticmethod
|
|
def make_condition():
|
|
return threading.Condition()
|
|
|
|
@staticmethod
|
|
def make_event():
|
|
return threading.Event()
|
|
|
|
def make_thread(self, *args, **kwargs):
|
|
t = threading.Thread(*args, **kwargs)
|
|
t.daemon = True
|
|
self.threads_made.append(t)
|
|
return t
|
|
|
|
@staticmethod
|
|
def make_wait():
|
|
return test_util.Wait()
|
|
|
|
def setUp(self):
|
|
super(KazooLockTests, self).setUp()
|
|
self.lockpath = "/" + uuid.uuid4().hex
|
|
self.condition = self.make_condition()
|
|
self.released = self.make_event()
|
|
self.active_thread = None
|
|
self.cancelled_threads = []
|
|
|
|
def _thread_lock_acquire_til_event(self, name, lock, event):
|
|
try:
|
|
with lock:
|
|
with self.condition:
|
|
assert self.active_thread is None
|
|
self.active_thread = name
|
|
self.condition.notify_all()
|
|
|
|
event.wait()
|
|
|
|
with self.condition:
|
|
assert self.active_thread == name
|
|
self.active_thread = None
|
|
self.condition.notify_all()
|
|
self.released.set()
|
|
except CancelledError:
|
|
with self.condition:
|
|
self.cancelled_threads.append(name)
|
|
self.condition.notify_all()
|
|
|
|
def test_lock_one(self):
|
|
lock_name = uuid.uuid4().hex
|
|
lock = self.client.Lock(self.lockpath, lock_name)
|
|
event = self.make_event()
|
|
thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=(lock_name, lock, event),
|
|
)
|
|
thread.start()
|
|
|
|
lock2_name = uuid.uuid4().hex
|
|
anotherlock = self.client.Lock(self.lockpath, lock2_name)
|
|
|
|
# wait for any contender to show up on the lock
|
|
wait = self.make_wait()
|
|
wait(anotherlock.contenders)
|
|
assert anotherlock.contenders() == [lock_name]
|
|
|
|
with self.condition:
|
|
while self.active_thread != lock_name:
|
|
self.condition.wait()
|
|
|
|
# release the lock
|
|
event.set()
|
|
|
|
with self.condition:
|
|
while self.active_thread:
|
|
self.condition.wait()
|
|
self.released.wait()
|
|
thread.join()
|
|
|
|
def test_lock(self):
|
|
threads = []
|
|
names = ["contender" + str(i) for i in range(5)]
|
|
|
|
contender_bits = {}
|
|
|
|
for name in names:
|
|
ev = self.make_event()
|
|
lock = self.client.Lock(self.lockpath, name)
|
|
thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=(name, lock, ev),
|
|
)
|
|
contender_bits[name] = (thread, ev)
|
|
threads.append(thread)
|
|
|
|
# acquire the lock ourselves first to make the others line up
|
|
lock = self.client.Lock(self.lockpath, "test")
|
|
lock.acquire()
|
|
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
# wait for everyone to line up on the lock
|
|
wait = self.make_wait()
|
|
wait(lambda: len(lock.contenders()) == 6)
|
|
contenders = lock.contenders()
|
|
|
|
assert contenders[0] == "test"
|
|
contenders = contenders[1:]
|
|
remaining = list(contenders)
|
|
|
|
# release the lock and contenders should claim it in order
|
|
lock.release()
|
|
|
|
for contender in contenders:
|
|
thread, event = contender_bits[contender]
|
|
|
|
with self.condition:
|
|
while not self.active_thread:
|
|
self.condition.wait()
|
|
assert self.active_thread == contender
|
|
|
|
assert lock.contenders() == remaining
|
|
remaining = remaining[1:]
|
|
|
|
event.set()
|
|
|
|
with self.condition:
|
|
while self.active_thread:
|
|
self.condition.wait()
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
def test_lock_reconnect(self):
|
|
event = self.make_event()
|
|
other_lock = self.client.Lock(self.lockpath, "contender")
|
|
thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("contender", other_lock, event),
|
|
)
|
|
|
|
# acquire the lock ourselves first to make the contender line up
|
|
lock = self.client.Lock(self.lockpath, "test")
|
|
lock.acquire()
|
|
|
|
thread.start()
|
|
# wait for the contender to line up on the lock
|
|
wait = self.make_wait()
|
|
wait(lambda: len(lock.contenders()) == 2)
|
|
assert lock.contenders() == ["test", "contender"]
|
|
|
|
self.expire_session(self.make_event)
|
|
|
|
lock.release()
|
|
|
|
with self.condition:
|
|
while not self.active_thread:
|
|
self.condition.wait()
|
|
assert self.active_thread == "contender"
|
|
|
|
event.set()
|
|
thread.join()
|
|
|
|
def test_lock_non_blocking(self):
|
|
lock_name = uuid.uuid4().hex
|
|
lock = self.client.Lock(self.lockpath, lock_name)
|
|
event = self.make_event()
|
|
|
|
thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=(lock_name, lock, event),
|
|
)
|
|
thread.start()
|
|
|
|
lock1 = self.client.Lock(self.lockpath, lock_name)
|
|
|
|
# wait for the thread to acquire the lock
|
|
with self.condition:
|
|
if not self.active_thread:
|
|
self.condition.wait(5)
|
|
|
|
assert not lock1.acquire(blocking=False)
|
|
assert lock.contenders() == [lock_name] # just one - itself
|
|
|
|
event.set()
|
|
thread.join()
|
|
|
|
def test_lock_fail_first_call(self):
|
|
event1 = self.make_event()
|
|
lock1 = self.client.Lock(self.lockpath, "one")
|
|
thread1 = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("one", lock1, event1),
|
|
)
|
|
thread1.start()
|
|
|
|
# wait for this thread to acquire the lock
|
|
with self.condition:
|
|
if not self.active_thread:
|
|
self.condition.wait(5)
|
|
assert self.active_thread == "one"
|
|
assert lock1.contenders() == ["one"]
|
|
event1.set()
|
|
thread1.join()
|
|
|
|
def test_lock_cancel(self):
|
|
event1 = self.make_event()
|
|
lock1 = self.client.Lock(self.lockpath, "one")
|
|
thread1 = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("one", lock1, event1),
|
|
)
|
|
thread1.start()
|
|
|
|
# wait for this thread to acquire the lock
|
|
with self.condition:
|
|
if not self.active_thread:
|
|
self.condition.wait(5)
|
|
assert self.active_thread == "one"
|
|
|
|
client2 = self._get_client()
|
|
client2.start()
|
|
event2 = self.make_event()
|
|
lock2 = client2.Lock(self.lockpath, "two")
|
|
thread2 = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("two", lock2, event2),
|
|
)
|
|
thread2.start()
|
|
|
|
# this one should block in acquire. check that it is a contender
|
|
wait = self.make_wait()
|
|
wait(lambda: len(lock2.contenders()) > 1)
|
|
assert lock2.contenders() == ["one", "two"]
|
|
|
|
lock2.cancel()
|
|
with self.condition:
|
|
if "two" not in self.cancelled_threads:
|
|
self.condition.wait()
|
|
assert "two" in self.cancelled_threads
|
|
|
|
assert lock2.contenders() == ["one"]
|
|
|
|
thread2.join()
|
|
event1.set()
|
|
thread1.join()
|
|
client2.stop()
|
|
|
|
def test_lock_no_double_calls(self):
|
|
lock1 = self.client.Lock(self.lockpath, "one")
|
|
lock1.acquire()
|
|
assert lock1.is_acquired is True
|
|
assert lock1.acquire(timeout=0.5) is False
|
|
assert lock1.is_acquired is True
|
|
lock1.release()
|
|
assert lock1.is_acquired is False
|
|
|
|
def test_lock_same_thread_no_block(self):
|
|
lock = self.client.Lock(self.lockpath, "one")
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is True
|
|
assert lock.is_acquired is True
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is False
|
|
|
|
def test_lock_many_threads_no_block(self):
|
|
lock = self.client.Lock(self.lockpath, "one")
|
|
attempts = collections.deque()
|
|
|
|
def _acquire():
|
|
attempts.append(int(lock.acquire(blocking=False)))
|
|
|
|
threads = []
|
|
for _i in range(0, self.thread_count):
|
|
t = self.make_thread(target=_acquire)
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
while threads:
|
|
t = threads.pop()
|
|
t.join()
|
|
|
|
assert sum(list(attempts)) == 1
|
|
|
|
def test_lock_many_threads(self):
|
|
sleep_func = self.client.handler.sleep_func
|
|
lock = self.client.Lock(self.lockpath, "one")
|
|
acquires = collections.deque()
|
|
differences = collections.deque()
|
|
barrier = SleepBarrier(self.thread_count, sleep_func)
|
|
|
|
def _acquire():
|
|
# Wait until all threads are ready to go...
|
|
with barrier as b:
|
|
b.wait()
|
|
with lock:
|
|
# Ensure that no two threads enter here and cause the
|
|
# count to differ by more than one, do this by recording
|
|
# the count that was captured and examining it post run.
|
|
starting_count = len(acquires)
|
|
acquires.append(1)
|
|
sleep_func(0.01)
|
|
end_count = len(acquires)
|
|
differences.append(end_count - starting_count)
|
|
|
|
threads = []
|
|
for _i in range(0, self.thread_count):
|
|
t = self.make_thread(target=_acquire)
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
while threads:
|
|
t = threads.pop()
|
|
t.join()
|
|
|
|
assert len(acquires) == self.thread_count
|
|
assert list(differences) == [1] * self.thread_count
|
|
|
|
def test_lock_reacquire(self):
|
|
lock = self.client.Lock(self.lockpath, "one")
|
|
lock.acquire()
|
|
lock.release()
|
|
lock.acquire()
|
|
lock.release()
|
|
|
|
def test_lock_ephemeral(self):
|
|
client1 = self._get_client()
|
|
client1.start()
|
|
lock = client1.Lock(self.lockpath, "ephemeral")
|
|
lock.acquire(ephemeral=False)
|
|
znode = self.lockpath + "/" + lock.node
|
|
client1.stop()
|
|
try:
|
|
self.client.get(znode)
|
|
except NoNodeError:
|
|
self.fail("NoNodeError raised unexpectedly!")
|
|
|
|
def test_lock_timeout(self):
|
|
timeout = 3
|
|
e = self.make_event()
|
|
started = self.make_event()
|
|
|
|
# In the background thread, acquire the lock and wait thrice the time
|
|
# that the main thread is going to wait to acquire the lock.
|
|
lock1 = self.client.Lock(self.lockpath, "one")
|
|
|
|
def _thread(lock, event, timeout):
|
|
with lock:
|
|
started.set()
|
|
event.wait(timeout)
|
|
if not event.is_set():
|
|
# Eventually fail to avoid hanging the tests
|
|
self.fail("lock2 never timed out")
|
|
|
|
t = self.make_thread(target=_thread, args=(lock1, e, timeout * 3))
|
|
t.start()
|
|
|
|
# Start the main thread's kazoo client and try to acquire the lock
|
|
# but give up after `timeout` seconds
|
|
client2 = self._get_client()
|
|
client2.start()
|
|
started.wait(5)
|
|
assert started.is_set() is True
|
|
lock2 = client2.Lock(self.lockpath, "two")
|
|
try:
|
|
lock2.acquire(timeout=timeout)
|
|
except LockTimeout:
|
|
# A timeout is the behavior we're expecting, since the background
|
|
# thread should still be holding onto the lock
|
|
pass
|
|
else:
|
|
self.fail("Main thread unexpectedly acquired the lock")
|
|
finally:
|
|
# Cleanup
|
|
e.set()
|
|
t.join()
|
|
client2.stop()
|
|
|
|
def test_read_lock(self):
|
|
# Test that we can obtain a read lock
|
|
lock = self.client.ReadLock(self.lockpath, "reader one")
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is True
|
|
assert lock.is_acquired is True
|
|
# and that it's still not reentrant.
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is False
|
|
# Test that a second client we can share the same read lock
|
|
client2 = self._get_client()
|
|
client2.start()
|
|
lock2 = client2.ReadLock(self.lockpath, "reader two")
|
|
gotten = lock2.acquire(blocking=False)
|
|
assert gotten is True
|
|
assert lock2.is_acquired is True
|
|
gotten = lock2.acquire(blocking=False)
|
|
assert gotten is False
|
|
# Test that a writer is unable to share it
|
|
client3 = self._get_client()
|
|
client3.start()
|
|
lock3 = client3.WriteLock(self.lockpath, "writer")
|
|
gotten = lock3.acquire(blocking=False)
|
|
assert gotten is False
|
|
|
|
def test_write_lock(self):
|
|
# Test that we can obtain a write lock
|
|
lock = self.client.WriteLock(self.lockpath, "writer")
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is True
|
|
assert lock.is_acquired is True
|
|
gotten = lock.acquire(blocking=False)
|
|
assert gotten is False
|
|
|
|
# Test that we are unable to obtain a read lock while the
|
|
# write lock is held.
|
|
client2 = self._get_client()
|
|
client2.start()
|
|
lock2 = client2.ReadLock(self.lockpath, "reader")
|
|
gotten = lock2.acquire(blocking=False)
|
|
assert gotten is False
|
|
|
|
def test_rw_lock(self):
|
|
reader_event = self.make_event()
|
|
reader_lock = self.client.ReadLock(self.lockpath, "reader")
|
|
reader_thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("reader", reader_lock, reader_event),
|
|
)
|
|
|
|
writer_event = self.make_event()
|
|
writer_lock = self.client.WriteLock(self.lockpath, "writer")
|
|
writer_thread = self.make_thread(
|
|
target=self._thread_lock_acquire_til_event,
|
|
args=("writer", writer_lock, writer_event),
|
|
)
|
|
|
|
# acquire a write lock ourselves first to make the others line up
|
|
lock = self.client.WriteLock(self.lockpath, "test")
|
|
lock.acquire()
|
|
|
|
reader_thread.start()
|
|
writer_thread.start()
|
|
|
|
# wait for everyone to line up on the lock
|
|
wait = self.make_wait()
|
|
wait(lambda: len(lock.contenders()) == 3)
|
|
contenders = lock.contenders()
|
|
|
|
assert contenders[0] == "test"
|
|
remaining = contenders[1:]
|
|
|
|
# release the lock and contenders should claim it in order
|
|
lock.release()
|
|
|
|
contender_bits = {
|
|
"reader": (reader_thread, reader_event),
|
|
"writer": (writer_thread, writer_event),
|
|
}
|
|
|
|
for contender in ("reader", "writer"):
|
|
thread, event = contender_bits[contender]
|
|
|
|
with self.condition:
|
|
while not self.active_thread:
|
|
self.condition.wait()
|
|
assert self.active_thread == contender
|
|
|
|
assert lock.contenders() == remaining
|
|
remaining = remaining[1:]
|
|
|
|
event.set()
|
|
|
|
with self.condition:
|
|
while self.active_thread:
|
|
self.condition.wait()
|
|
|
|
reader_thread.join()
|
|
writer_thread.join()
|
|
|
|
|
|
class TestSemaphore(KazooTestCase):
|
|
def __init__(self, *args, **kw):
|
|
super(TestSemaphore, self).__init__(*args, **kw)
|
|
self.threads_made = []
|
|
|
|
def tearDown(self):
|
|
super(TestSemaphore, self).tearDown()
|
|
while self.threads_made:
|
|
t = self.threads_made.pop()
|
|
t.join()
|
|
|
|
@staticmethod
|
|
def make_condition():
|
|
return threading.Condition()
|
|
|
|
@staticmethod
|
|
def make_event():
|
|
return threading.Event()
|
|
|
|
def make_thread(self, *args, **kwargs):
|
|
t = threading.Thread(*args, **kwargs)
|
|
t.daemon = True
|
|
self.threads_made.append(t)
|
|
return t
|
|
|
|
def setUp(self):
|
|
super(TestSemaphore, self).setUp()
|
|
self.lockpath = "/" + uuid.uuid4().hex
|
|
self.condition = self.make_condition()
|
|
self.released = self.make_event()
|
|
self.active_thread = None
|
|
self.cancelled_threads = []
|
|
|
|
def test_basic(self):
|
|
sem1 = self.client.Semaphore(self.lockpath)
|
|
sem1.acquire()
|
|
sem1.release()
|
|
|
|
def test_lock_one(self):
|
|
sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
|
|
sem2 = self.client.Semaphore(self.lockpath, max_leases=1)
|
|
started = self.make_event()
|
|
event = self.make_event()
|
|
|
|
sem1.acquire()
|
|
|
|
def sema_one():
|
|
started.set()
|
|
with sem2:
|
|
event.set()
|
|
|
|
thread = self.make_thread(target=sema_one, args=())
|
|
thread.start()
|
|
started.wait(10)
|
|
|
|
assert event.is_set() is False
|
|
|
|
sem1.release()
|
|
event.wait(10)
|
|
assert event.is_set() is True
|
|
thread.join()
|
|
|
|
def test_non_blocking(self):
|
|
sem1 = self.client.Semaphore(
|
|
self.lockpath, identifier="sem1", max_leases=2
|
|
)
|
|
sem2 = self.client.Semaphore(
|
|
self.lockpath, identifier="sem2", max_leases=2
|
|
)
|
|
sem3 = self.client.Semaphore(
|
|
self.lockpath, identifier="sem3", max_leases=2
|
|
)
|
|
|
|
sem1.acquire()
|
|
sem2.acquire()
|
|
assert not sem3.acquire(blocking=False)
|
|
assert set(sem1.lease_holders()) == set(["sem1", "sem2"])
|
|
sem2.release()
|
|
# the next line isn't required, but avoids timing issues in tests
|
|
sem3.acquire()
|
|
assert set(sem1.lease_holders()) == set(["sem1", "sem3"])
|
|
sem1.release()
|
|
sem3.release()
|
|
|
|
def test_non_blocking_release(self):
|
|
sem1 = self.client.Semaphore(
|
|
self.lockpath, identifier="sem1", max_leases=1
|
|
)
|
|
sem2 = self.client.Semaphore(
|
|
self.lockpath, identifier="sem2", max_leases=1
|
|
)
|
|
sem1.acquire()
|
|
sem2.acquire(blocking=False)
|
|
|
|
# make sure there's no shutdown / cleanup error
|
|
sem1.release()
|
|
sem2.release()
|
|
|
|
def test_holders(self):
|
|
started = self.make_event()
|
|
event = self.make_event()
|
|
|
|
def sema_one():
|
|
with self.client.Semaphore(self.lockpath, "fred", max_leases=1):
|
|
started.set()
|
|
event.wait()
|
|
|
|
thread = self.make_thread(target=sema_one, args=())
|
|
thread.start()
|
|
started.wait()
|
|
sem1 = self.client.Semaphore(self.lockpath)
|
|
holders = sem1.lease_holders()
|
|
assert holders == ["fred"]
|
|
event.set()
|
|
thread.join()
|
|
|
|
def test_semaphore_cancel(self):
|
|
sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
|
|
sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1)
|
|
sem1.acquire()
|
|
started = self.make_event()
|
|
event = self.make_event()
|
|
|
|
def sema_one():
|
|
started.set()
|
|
try:
|
|
with sem2:
|
|
started.set()
|
|
except CancelledError:
|
|
event.set()
|
|
|
|
thread = self.make_thread(target=sema_one, args=())
|
|
thread.start()
|
|
started.wait()
|
|
assert sem1.lease_holders() == ["fred"]
|
|
assert not event.is_set()
|
|
sem2.cancel()
|
|
event.wait()
|
|
assert event.is_set()
|
|
thread.join()
|
|
|
|
def test_multiple_acquire_and_release(self):
|
|
sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
|
|
sem1.acquire()
|
|
sem1.acquire()
|
|
|
|
assert sem1.release()
|
|
assert not sem1.release()
|
|
|
|
def test_handle_session_loss(self):
|
|
expire_semaphore = self.client.Semaphore(
|
|
self.lockpath, "fred", max_leases=1
|
|
)
|
|
|
|
client = self._get_client()
|
|
client.start()
|
|
lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1)
|
|
lh_semaphore.acquire()
|
|
|
|
started = self.make_event()
|
|
event = self.make_event()
|
|
event2 = self.make_event()
|
|
|
|
def sema_one():
|
|
started.set()
|
|
with expire_semaphore:
|
|
event.set()
|
|
event2.wait()
|
|
|
|
thread1 = self.make_thread(target=sema_one, args=())
|
|
thread1.start()
|
|
|
|
started.wait()
|
|
assert lh_semaphore.lease_holders() == ["george"]
|
|
|
|
# Fired in a separate thread to make sure we can see the effect
|
|
expired = self.make_event()
|
|
|
|
def expire():
|
|
self.expire_session(self.make_event)
|
|
expired.set()
|
|
|
|
thread2 = self.make_thread(target=expire, args=())
|
|
thread2.start()
|
|
expire_semaphore.wake_event.wait()
|
|
expired.wait()
|
|
|
|
lh_semaphore.release()
|
|
client.stop()
|
|
|
|
event.wait(15)
|
|
assert expire_semaphore.lease_holders() == ["fred"]
|
|
event2.set()
|
|
|
|
for t in (thread1, thread2):
|
|
t.join()
|
|
|
|
def test_inconsistent_max_leases(self):
|
|
sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
|
|
sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
|
|
|
|
sem1.acquire()
|
|
with pytest.raises(ValueError):
|
|
sem2.acquire()
|
|
|
|
def test_inconsistent_max_leases_other_data(self):
|
|
sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
|
|
sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
|
|
|
|
self.client.ensure_path(self.lockpath)
|
|
self.client.set(self.lockpath, b"a$")
|
|
|
|
sem1.acquire()
|
|
# sem2 thinks it's ok to have two lease holders
|
|
assert sem2.acquire(blocking=False)
|
|
|
|
def test_reacquire(self):
|
|
lock = self.client.Semaphore(self.lockpath)
|
|
lock.acquire()
|
|
lock.release()
|
|
lock.acquire()
|
|
lock.release()
|
|
|
|
def test_acquire_after_cancelled(self):
|
|
lock = self.client.Semaphore(self.lockpath)
|
|
assert lock.acquire() is True
|
|
assert lock.release() is True
|
|
lock.cancel()
|
|
assert lock.cancelled is True
|
|
assert lock.acquire() is True
|
|
|
|
def test_timeout(self):
|
|
timeout = 3
|
|
e = self.make_event()
|
|
started = self.make_event()
|
|
|
|
# In the background thread, acquire the lock and wait thrice the time
|
|
# that the main thread is going to wait to acquire the lock.
|
|
sem1 = self.client.Semaphore(self.lockpath, "one")
|
|
|
|
def _thread(sem, event, timeout):
|
|
with sem:
|
|
started.set()
|
|
event.wait(timeout)
|
|
if not event.is_set():
|
|
# Eventually fail to avoid hanging the tests
|
|
self.fail("sem2 never timed out")
|
|
|
|
t = self.make_thread(target=_thread, args=(sem1, e, timeout * 3))
|
|
t.start()
|
|
|
|
# Start the main thread's kazoo client and try to acquire the lock
|
|
# but give up after `timeout` seconds
|
|
client2 = self._get_client()
|
|
client2.start()
|
|
started.wait(5)
|
|
assert started.is_set() is True
|
|
sem2 = client2.Semaphore(self.lockpath, "two")
|
|
try:
|
|
sem2.acquire(timeout=timeout)
|
|
except LockTimeout:
|
|
# A timeout is the behavior we're expecting, since the background
|
|
# thread will still be holding onto the lock
|
|
e.set()
|
|
finally:
|
|
# Cleanup
|
|
t.join()
|
|
client2.stop()
|
|
|
|
|
|
class TestSequence(unittest.TestCase):
|
|
def test_get_predecessor(self):
|
|
"""Validate selection of predecessors."""
|
|
goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
|
|
pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
|
|
children = ["hello", goLock, "world", pyLock]
|
|
client = MagicMock()
|
|
client.get_children.return_value = children
|
|
lock = Lock(client, "test")
|
|
assert lock._get_predecessor(pyLock) is None
|
|
|
|
def test_get_predecessor_go(self):
|
|
"""Test selection of predecessor when instructed to consider go-zk
|
|
locks.
|
|
"""
|
|
goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
|
|
pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
|
|
children = ["hello", goLock, "world", pyLock]
|
|
client = MagicMock()
|
|
client.get_children.return_value = children
|
|
lock = Lock(client, "test", extra_lock_patterns=["-lock-"])
|
|
assert lock._get_predecessor(pyLock) == goLock
|