|
|
import unittest import sys
import pytest
from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError from kazoo.protocol.states import Callback from kazoo.testing import KazooTestCase from kazoo.tests import test_client
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows") class TestGeventHandler(unittest.TestCase): def setUp(self): try: import gevent # NOQA except ImportError: pytest.skip("gevent not available.")
def _makeOne(self, *args): from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _getAsync(self, *args): from kazoo.handlers.gevent import AsyncResult
return AsyncResult
def _getEvent(self): from gevent.event import Event
return Event
def test_proper_threading(self): h = self._makeOne() h.start() assert isinstance(h.event_object(), self._getEvent())
def test_matching_async(self): h = self._makeOne() h.start() async_handler = self._getAsync() assert isinstance(h.async_result(), async_handler)
def test_exception_raising(self): h = self._makeOne()
with pytest.raises(h.timeout_exception): raise h.timeout_exception("This is a timeout")
def test_exception_in_queue(self): h = self._makeOne() h.start() ev = self._getEvent()()
def func(): ev.set() raise ValueError("bang")
call1 = Callback("completion", func, ()) h.dispatch_callback(call1) ev.wait()
def test_queue_empty_exception(self): from gevent.queue import Empty
h = self._makeOne() h.start() ev = self._getEvent()()
def func(): ev.set() raise Empty()
call1 = Callback("completion", func, ()) h.dispatch_callback(call1) ev.wait()
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows") class TestBasicGeventClient(KazooTestCase): def setUp(self): try: import gevent # NOQA except ImportError: pytest.skip("gevent not available.") KazooTestCase.setUp(self)
def _makeOne(self, *args): from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _getEvent(self): from gevent.event import Event
return Event
def test_start(self): client = self._get_client(handler=self._makeOne()) client.start() assert client.state == "CONNECTED" client.stop()
def test_start_stop_double(self): client = self._get_client(handler=self._makeOne()) client.start() assert client.state == "CONNECTED" client.handler.start() client.handler.stop() client.stop()
def test_basic_commands(self): client = self._get_client(handler=self._makeOne()) client.start() assert client.state == "CONNECTED" client.create("/anode", b"fred") assert client.get("/anode")[0] == b"fred" assert client.delete("/anode") assert client.exists("/anode") is None client.stop()
def test_failures(self): client = self._get_client(handler=self._makeOne()) client.start() with pytest.raises(NoNodeError): client.get("/none") client.stop()
def test_data_watcher(self): client = self._get_client(handler=self._makeOne()) client.start() client.ensure_path("/some/node") ev = self._getEvent()()
@client.DataWatch("/some/node") def changed(d, stat): ev.set()
ev.wait() ev.clear() client.set("/some/node", b"newvalue") ev.wait() client.stop()
def test_huge_file_descriptor(self): import resource from gevent import socket from kazoo.handlers.utils import create_tcp_socket
try: resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) except (ValueError, resource.error): self.skipTest("couldnt raise fd limit high enough") fd = 0 socks = [] while fd < 4000: sock = create_tcp_socket(socket) fd = sock.fileno() socks.append(sock) h = self._makeOne() h.start() h.select(socks, [], [], 0) h.stop() for sock in socks: sock.close()
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows") class TestGeventClient(test_client.TestClient): def setUp(self): try: import gevent # NOQA except ImportError: pytest.skip("gevent not available.") KazooTestCase.setUp(self)
def _makeOne(self, *args): from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _get_client(self, **kwargs): kwargs["handler"] = self._makeOne() return KazooClient(self.hosts, **kwargs)
|