图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

397 lines
10 KiB

import threading
import unittest
from unittest.mock import Mock
import pytest
class TestThreadingHandler(unittest.TestCase):
def _makeOne(self, *args):
from kazoo.handlers.threading import SequentialThreadingHandler
return SequentialThreadingHandler(*args)
def _getAsync(self, *args):
from kazoo.handlers.threading import AsyncResult
return AsyncResult
def test_proper_threading(self):
h = self._makeOne()
h.start()
# In Python 3.3 _Event is gone, before Event is function
event_class = getattr(threading, "_Event", threading.Event)
assert isinstance(h.event_object(), event_class)
def test_matching_async(self):
h = self._makeOne()
h.start()
async_result = self._getAsync()
assert isinstance(h.async_result(), async_result)
def test_exception_raising(self):
h = self._makeOne()
with pytest.raises(h.timeout_exception):
raise h.timeout_exception("This is a timeout")
def test_double_start_stop(self):
h = self._makeOne()
h.start()
assert h._running is True
h.start()
h.stop()
h.stop()
assert h._running is False
def test_huge_file_descriptor(self):
try:
import resource
except ImportError:
self.skipTest("resource module unavailable on this platform")
import socket
from kazoo.handlers.utils import create_tcp_socket
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
self.skipTest("couldnt raise fd limit high enough")
fd = 0
socks = []
while fd < 4000:
sock = create_tcp_socket(socket)
fd = sock.fileno()
socks.append(sock)
h = self._makeOne()
h.start()
h.select(socks, [], [], 0)
h.stop()
for sock in socks:
sock.close()
class TestThreadingAsync(unittest.TestCase):
def _makeOne(self, *args):
from kazoo.handlers.threading import AsyncResult
return AsyncResult(*args)
def _makeHandler(self):
from kazoo.handlers.threading import SequentialThreadingHandler
return SequentialThreadingHandler()
def test_ready(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
assert async_result.ready() is False
async_result.set("val")
assert async_result.ready() is True
assert async_result.successful() is True
assert async_result.exception is None
def test_callback_queued(self):
mock_handler = Mock()
mock_handler.completion_queue = Mock()
async_result = self._makeOne(mock_handler)
async_result.rawlink(lambda a: a)
async_result.set("val")
assert mock_handler.completion_queue.put.called
def test_set_exception(self):
mock_handler = Mock()
mock_handler.completion_queue = Mock()
async_result = self._makeOne(mock_handler)
async_result.rawlink(lambda a: a)
async_result.set_exception(ImportError("Error occured"))
assert isinstance(async_result.exception, ImportError)
assert mock_handler.completion_queue.put.called
def test_get_wait_while_setting(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
bv = threading.Event()
cv = threading.Event()
def wait_for_val():
bv.set()
val = async_result.get()
lst.append(val)
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
bv.wait()
async_result.set("fred")
cv.wait()
assert lst == ["fred"]
th.join()
def test_get_with_nowait(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
timeout = self._makeHandler().timeout_exception
with pytest.raises(timeout):
async_result.get(block=False)
with pytest.raises(timeout):
async_result.get_nowait()
def test_get_with_exception(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
bv = threading.Event()
cv = threading.Event()
def wait_for_val():
bv.set()
try:
val = async_result.get()
except ImportError:
lst.append("oops")
else:
lst.append(val)
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
bv.wait()
async_result.set_exception(ImportError)
cv.wait()
assert lst == ["oops"]
th.join()
def test_wait(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
bv = threading.Event()
cv = threading.Event()
def wait_for_val():
bv.set()
try:
val = async_result.wait(10)
except ImportError:
lst.append("oops")
else:
lst.append(val)
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
bv.wait(10)
async_result.set("fred")
cv.wait(15)
assert lst == [True]
th.join()
def test_wait_race(self):
"""Test that there is no race condition in `IAsyncResult.wait()`.
Guards against the reappearance of:
https://github.com/python-zk/kazoo/issues/485
"""
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
async_result.set("immediate")
cv = threading.Event()
def wait_for_val():
# NB: should not sleep
async_result.wait(20)
cv.set()
th = threading.Thread(target=wait_for_val)
th.daemon = True
th.start()
# if the wait() didn't sleep (correctly), cv will be set quickly
# if it did sleep, the cv will not be set yet and this will timeout
cv.wait(10)
assert cv.is_set() is True
th.join()
def test_set_before_wait(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
cv = threading.Event()
async_result.set("fred")
def wait_for_val():
val = async_result.get()
lst.append(val)
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
cv.wait()
assert lst == ["fred"]
th.join()
def test_set_exc_before_wait(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
cv = threading.Event()
async_result.set_exception(ImportError)
def wait_for_val():
try:
val = async_result.get()
except ImportError:
lst.append("ooops")
else:
lst.append(val)
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
cv.wait()
assert lst == ["ooops"]
th.join()
def test_linkage(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
cv = threading.Event()
lst = []
def add_on():
lst.append(True)
def wait_for_val():
async_result.get()
cv.set()
th = threading.Thread(target=wait_for_val)
th.start()
async_result.rawlink(add_on)
async_result.set(b"fred")
assert mock_handler.completion_queue.put.called
async_result.unlink(add_on)
cv.wait()
assert async_result.value == b"fred"
th.join()
def test_linkage_not_ready(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
def add_on():
lst.append(True)
async_result.set("fred")
assert not mock_handler.completion_queue.called
async_result.rawlink(add_on)
assert mock_handler.completion_queue.put.called
def test_link_and_unlink(self):
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
def add_on():
lst.append(True)
async_result.rawlink(add_on)
assert not mock_handler.completion_queue.put.called
async_result.unlink(add_on)
async_result.set("fred")
assert not mock_handler.completion_queue.put.called
def test_captured_exception(self):
from kazoo.handlers.utils import capture_exceptions
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
@capture_exceptions(async_result)
def exceptional_function():
return 1 / 0
exceptional_function()
with pytest.raises(ZeroDivisionError):
async_result.get()
def test_no_capture_exceptions(self):
from kazoo.handlers.utils import capture_exceptions
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
def add_on():
lst.append(True)
async_result.rawlink(add_on)
@capture_exceptions(async_result)
def regular_function():
return True
regular_function()
assert not mock_handler.completion_queue.put.called
def test_wraps(self):
from kazoo.handlers.utils import wrap
mock_handler = Mock()
async_result = self._makeOne(mock_handler)
lst = []
def add_on(result):
lst.append(result.get())
async_result.rawlink(add_on)
@wrap(async_result)
def regular_function():
return "hello"
assert regular_function() == "hello"
assert mock_handler.completion_queue.put.called
assert async_result.get() == "hello"
def test_multiple_callbacks(self):
mockback1 = Mock(name="mockback1")
mockback2 = Mock(name="mockback2")
handler = self._makeHandler()
handler.start()
async_result = self._makeOne(handler)
async_result.rawlink(mockback1)
async_result.rawlink(mockback2)
async_result.set("howdy")
async_result.wait()
handler.stop()
mockback2.assert_called_once_with(async_result)
mockback1.assert_called_once_with(async_result)