图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
2.4 KiB

  1. """
  2. The official python select function test case copied from python source
  3. to test the selector_select function.
  4. """
  5. import os
  6. import socket
  7. import sys
  8. import unittest
  9. from kazoo.handlers.utils import selector_select
  10. select = selector_select
  11. @unittest.skipIf(
  12. (sys.platform[:3] == "win"), "can't easily test on this system"
  13. )
  14. class SelectTestCase(unittest.TestCase):
  15. class Nope:
  16. pass
  17. class Almost:
  18. def fileno(self):
  19. return "fileno"
  20. def test_error_conditions(self):
  21. self.assertRaises(TypeError, select, 1, 2, 3)
  22. self.assertRaises(TypeError, select, [self.Nope()], [], [])
  23. self.assertRaises(TypeError, select, [self.Almost()], [], [])
  24. self.assertRaises(TypeError, select, [], [], [], "not a number")
  25. self.assertRaises(ValueError, select, [], [], [], -1)
  26. # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
  27. @unittest.skipIf(
  28. sys.platform.startswith("freebsd"),
  29. "skip because of a FreeBSD bug: kern/155606",
  30. )
  31. def test_errno(self):
  32. with open(__file__, "rb") as fp:
  33. fd = fp.fileno()
  34. fp.close()
  35. self.assertRaises(ValueError, select, [fd], [], [], 0)
  36. def test_returned_list_identity(self):
  37. # See issue #8329
  38. r, w, x = select([], [], [], 1)
  39. self.assertIsNot(r, w)
  40. self.assertIsNot(r, x)
  41. self.assertIsNot(w, x)
  42. def test_select(self):
  43. cmd = "for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done"
  44. p = os.popen(cmd, "r")
  45. for tout in (0, 1, 2, 4, 8, 16) + (None,) * 10:
  46. rfd, wfd, xfd = select([p], [], [], tout)
  47. if (rfd, wfd, xfd) == ([], [], []):
  48. continue
  49. if (rfd, wfd, xfd) == ([p], [], []):
  50. line = p.readline()
  51. if not line:
  52. break
  53. continue
  54. self.fail("Unexpected return values from select():", rfd, wfd, xfd)
  55. p.close()
  56. # Issue 16230: Crash on select resized list
  57. def test_select_mutated(self):
  58. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  59. a = []
  60. class F:
  61. def fileno(self):
  62. del a[-1]
  63. return s.fileno()
  64. a[:] = [F()] * 10
  65. self.assertEqual(select([], a, []), ([], a[:5], []))
  66. if __name__ == "__main__":
  67. unittest.main()