图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

187 lines
5.1 KiB

import unittest
import sys
import pytest
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import Callback
from kazoo.testing import KazooTestCase
from kazoo.tests import test_client
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
class TestGeventHandler(unittest.TestCase):
def setUp(self):
try:
import gevent # NOQA
except ImportError:
pytest.skip("gevent not available.")
def _makeOne(self, *args):
from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _getAsync(self, *args):
from kazoo.handlers.gevent import AsyncResult
return AsyncResult
def _getEvent(self):
from gevent.event import Event
return Event
def test_proper_threading(self):
h = self._makeOne()
h.start()
assert isinstance(h.event_object(), self._getEvent())
def test_matching_async(self):
h = self._makeOne()
h.start()
async_handler = self._getAsync()
assert isinstance(h.async_result(), async_handler)
def test_exception_raising(self):
h = self._makeOne()
with pytest.raises(h.timeout_exception):
raise h.timeout_exception("This is a timeout")
def test_exception_in_queue(self):
h = self._makeOne()
h.start()
ev = self._getEvent()()
def func():
ev.set()
raise ValueError("bang")
call1 = Callback("completion", func, ())
h.dispatch_callback(call1)
ev.wait()
def test_queue_empty_exception(self):
from gevent.queue import Empty
h = self._makeOne()
h.start()
ev = self._getEvent()()
def func():
ev.set()
raise Empty()
call1 = Callback("completion", func, ())
h.dispatch_callback(call1)
ev.wait()
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
class TestBasicGeventClient(KazooTestCase):
def setUp(self):
try:
import gevent # NOQA
except ImportError:
pytest.skip("gevent not available.")
KazooTestCase.setUp(self)
def _makeOne(self, *args):
from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _getEvent(self):
from gevent.event import Event
return Event
def test_start(self):
client = self._get_client(handler=self._makeOne())
client.start()
assert client.state == "CONNECTED"
client.stop()
def test_start_stop_double(self):
client = self._get_client(handler=self._makeOne())
client.start()
assert client.state == "CONNECTED"
client.handler.start()
client.handler.stop()
client.stop()
def test_basic_commands(self):
client = self._get_client(handler=self._makeOne())
client.start()
assert client.state == "CONNECTED"
client.create("/anode", b"fred")
assert client.get("/anode")[0] == b"fred"
assert client.delete("/anode")
assert client.exists("/anode") is None
client.stop()
def test_failures(self):
client = self._get_client(handler=self._makeOne())
client.start()
with pytest.raises(NoNodeError):
client.get("/none")
client.stop()
def test_data_watcher(self):
client = self._get_client(handler=self._makeOne())
client.start()
client.ensure_path("/some/node")
ev = self._getEvent()()
@client.DataWatch("/some/node")
def changed(d, stat):
ev.set()
ev.wait()
ev.clear()
client.set("/some/node", b"newvalue")
ev.wait()
client.stop()
def test_huge_file_descriptor(self):
import resource
from gevent import socket
from kazoo.handlers.utils import create_tcp_socket
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
self.skipTest("couldnt raise fd limit high enough")
fd = 0
socks = []
while fd < 4000:
sock = create_tcp_socket(socket)
fd = sock.fileno()
socks.append(sock)
h = self._makeOne()
h.start()
h.select(socks, [], [], 0)
h.stop()
for sock in socks:
sock.close()
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
class TestGeventClient(test_client.TestClient):
def setUp(self):
try:
import gevent # NOQA
except ImportError:
pytest.skip("gevent not available.")
KazooTestCase.setUp(self)
def _makeOne(self, *args):
from kazoo.handlers.gevent import SequentialGeventHandler
return SequentialGeventHandler(*args)
def _get_client(self, **kwargs):
kwargs["handler"] = self._makeOne()
return KazooClient(self.hosts, **kwargs)