You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
import threading import unittest from unittest.mock import Mock
import pytest
class TestThreadingHandler(unittest.TestCase): def _makeOne(self, *args): from kazoo.handlers.threading import SequentialThreadingHandler
return SequentialThreadingHandler(*args)
def _getAsync(self, *args): from kazoo.handlers.threading import AsyncResult
return AsyncResult
def test_proper_threading(self): h = self._makeOne() h.start() # In Python 3.3 _Event is gone, before Event is function event_class = getattr(threading, "_Event", threading.Event) assert isinstance(h.event_object(), event_class)
def test_matching_async(self): h = self._makeOne() h.start() async_result = self._getAsync() assert isinstance(h.async_result(), async_result)
def test_exception_raising(self): h = self._makeOne()
with pytest.raises(h.timeout_exception): raise h.timeout_exception("This is a timeout")
def test_double_start_stop(self): h = self._makeOne() h.start() assert h._running is True h.start() h.stop() h.stop() assert h._running is False
def test_huge_file_descriptor(self): try: import resource except ImportError: self.skipTest("resource module unavailable on this platform") import socket from kazoo.handlers.utils import create_tcp_socket
try: resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) except (ValueError, resource.error): self.skipTest("couldnt raise fd limit high enough") fd = 0 socks = [] while fd < 4000: sock = create_tcp_socket(socket) fd = sock.fileno() socks.append(sock) h = self._makeOne() h.start() h.select(socks, [], [], 0) h.stop() for sock in socks: sock.close()
class TestThreadingAsync(unittest.TestCase): def _makeOne(self, *args): from kazoo.handlers.threading import AsyncResult
return AsyncResult(*args)
def _makeHandler(self): from kazoo.handlers.threading import SequentialThreadingHandler
return SequentialThreadingHandler()
def test_ready(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
assert async_result.ready() is False async_result.set("val") assert async_result.ready() is True assert async_result.successful() is True assert async_result.exception is None
def test_callback_queued(self): mock_handler = Mock() mock_handler.completion_queue = Mock() async_result = self._makeOne(mock_handler)
async_result.rawlink(lambda a: a) async_result.set("val")
assert mock_handler.completion_queue.put.called
def test_set_exception(self): mock_handler = Mock() mock_handler.completion_queue = Mock() async_result = self._makeOne(mock_handler) async_result.rawlink(lambda a: a) async_result.set_exception(ImportError("Error occured"))
assert isinstance(async_result.exception, ImportError) assert mock_handler.completion_queue.put.called
def test_get_wait_while_setting(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = [] bv = threading.Event() cv = threading.Event()
def wait_for_val(): bv.set() val = async_result.get() lst.append(val) cv.set()
th = threading.Thread(target=wait_for_val) th.start() bv.wait()
async_result.set("fred") cv.wait() assert lst == ["fred"] th.join()
def test_get_with_nowait(self): mock_handler = Mock() async_result = self._makeOne(mock_handler) timeout = self._makeHandler().timeout_exception
with pytest.raises(timeout): async_result.get(block=False)
with pytest.raises(timeout): async_result.get_nowait()
def test_get_with_exception(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = [] bv = threading.Event() cv = threading.Event()
def wait_for_val(): bv.set() try: val = async_result.get() except ImportError: lst.append("oops") else: lst.append(val) cv.set()
th = threading.Thread(target=wait_for_val) th.start() bv.wait()
async_result.set_exception(ImportError) cv.wait() assert lst == ["oops"] th.join()
def test_wait(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = [] bv = threading.Event() cv = threading.Event()
def wait_for_val(): bv.set() try: val = async_result.wait(10) except ImportError: lst.append("oops") else: lst.append(val) cv.set()
th = threading.Thread(target=wait_for_val) th.start() bv.wait(10)
async_result.set("fred") cv.wait(15) assert lst == [True] th.join()
def test_wait_race(self): """Test that there is no race condition in `IAsyncResult.wait()`.
Guards against the reappearance of: https://github.com/python-zk/kazoo/issues/485 """
mock_handler = Mock() async_result = self._makeOne(mock_handler)
async_result.set("immediate")
cv = threading.Event()
def wait_for_val(): # NB: should not sleep async_result.wait(20) cv.set()
th = threading.Thread(target=wait_for_val) th.daemon = True th.start()
# if the wait() didn't sleep (correctly), cv will be set quickly # if it did sleep, the cv will not be set yet and this will timeout cv.wait(10) assert cv.is_set() is True th.join()
def test_set_before_wait(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = [] cv = threading.Event() async_result.set("fred")
def wait_for_val(): val = async_result.get() lst.append(val) cv.set()
th = threading.Thread(target=wait_for_val) th.start() cv.wait() assert lst == ["fred"] th.join()
def test_set_exc_before_wait(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = [] cv = threading.Event() async_result.set_exception(ImportError)
def wait_for_val(): try: val = async_result.get() except ImportError: lst.append("ooops") else: lst.append(val) cv.set()
th = threading.Thread(target=wait_for_val) th.start() cv.wait() assert lst == ["ooops"] th.join()
def test_linkage(self): mock_handler = Mock() async_result = self._makeOne(mock_handler) cv = threading.Event()
lst = []
def add_on(): lst.append(True)
def wait_for_val(): async_result.get() cv.set()
th = threading.Thread(target=wait_for_val) th.start()
async_result.rawlink(add_on) async_result.set(b"fred") assert mock_handler.completion_queue.put.called async_result.unlink(add_on) cv.wait() assert async_result.value == b"fred" th.join()
def test_linkage_not_ready(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = []
def add_on(): lst.append(True)
async_result.set("fred") assert not mock_handler.completion_queue.called async_result.rawlink(add_on) assert mock_handler.completion_queue.put.called
def test_link_and_unlink(self): mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = []
def add_on(): lst.append(True)
async_result.rawlink(add_on) assert not mock_handler.completion_queue.put.called async_result.unlink(add_on) async_result.set("fred") assert not mock_handler.completion_queue.put.called
def test_captured_exception(self): from kazoo.handlers.utils import capture_exceptions
mock_handler = Mock() async_result = self._makeOne(mock_handler)
@capture_exceptions(async_result) def exceptional_function(): return 1 / 0
exceptional_function()
with pytest.raises(ZeroDivisionError): async_result.get()
def test_no_capture_exceptions(self): from kazoo.handlers.utils import capture_exceptions
mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = []
def add_on(): lst.append(True)
async_result.rawlink(add_on)
@capture_exceptions(async_result) def regular_function(): return True
regular_function()
assert not mock_handler.completion_queue.put.called
def test_wraps(self): from kazoo.handlers.utils import wrap
mock_handler = Mock() async_result = self._makeOne(mock_handler)
lst = []
def add_on(result): lst.append(result.get())
async_result.rawlink(add_on)
@wrap(async_result) def regular_function(): return "hello"
assert regular_function() == "hello" assert mock_handler.completion_queue.put.called assert async_result.get() == "hello"
def test_multiple_callbacks(self): mockback1 = Mock(name="mockback1") mockback2 = Mock(name="mockback2") handler = self._makeHandler() handler.start()
async_result = self._makeOne(handler) async_result.rawlink(mockback1) async_result.rawlink(mockback2) async_result.set("howdy") async_result.wait() handler.stop()
mockback2.assert_called_once_with(async_result) mockback1.assert_called_once_with(async_result)
|