图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
10 KiB

  1. import os
  2. import pytest
  3. from tempfile import mkdtemp, mkstemp, NamedTemporaryFile
  4. from shutil import rmtree
  5. import numpy.lib._datasource as datasource
  6. from numpy.testing import assert_, assert_equal, assert_raises
  7. import urllib.request as urllib_request
  8. from urllib.parse import urlparse
  9. from urllib.error import URLError
  10. def urlopen_stub(url, data=None):
  11. '''Stub to replace urlopen for testing.'''
  12. if url == valid_httpurl():
  13. tmpfile = NamedTemporaryFile(prefix='urltmp_')
  14. return tmpfile
  15. else:
  16. raise URLError('Name or service not known')
  17. # setup and teardown
  18. old_urlopen = None
  19. def setup_module():
  20. global old_urlopen
  21. old_urlopen = urllib_request.urlopen
  22. urllib_request.urlopen = urlopen_stub
  23. def teardown_module():
  24. urllib_request.urlopen = old_urlopen
  25. # A valid website for more robust testing
  26. http_path = 'http://www.google.com/'
  27. http_file = 'index.html'
  28. http_fakepath = 'http://fake.abc.web/site/'
  29. http_fakefile = 'fake.txt'
  30. malicious_files = ['/etc/shadow', '../../shadow',
  31. '..\\system.dat', 'c:\\windows\\system.dat']
  32. magic_line = b'three is the magic number'
  33. # Utility functions used by many tests
  34. def valid_textfile(filedir):
  35. # Generate and return a valid temporary file.
  36. fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True)
  37. os.close(fd)
  38. return path
  39. def invalid_textfile(filedir):
  40. # Generate and return an invalid filename.
  41. fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir)
  42. os.close(fd)
  43. os.remove(path)
  44. return path
  45. def valid_httpurl():
  46. return http_path+http_file
  47. def invalid_httpurl():
  48. return http_fakepath+http_fakefile
  49. def valid_baseurl():
  50. return http_path
  51. def invalid_baseurl():
  52. return http_fakepath
  53. def valid_httpfile():
  54. return http_file
  55. def invalid_httpfile():
  56. return http_fakefile
  57. class TestDataSourceOpen:
  58. def setup(self):
  59. self.tmpdir = mkdtemp()
  60. self.ds = datasource.DataSource(self.tmpdir)
  61. def teardown(self):
  62. rmtree(self.tmpdir)
  63. del self.ds
  64. def test_ValidHTTP(self):
  65. fh = self.ds.open(valid_httpurl())
  66. assert_(fh)
  67. fh.close()
  68. def test_InvalidHTTP(self):
  69. url = invalid_httpurl()
  70. assert_raises(IOError, self.ds.open, url)
  71. try:
  72. self.ds.open(url)
  73. except IOError as e:
  74. # Regression test for bug fixed in r4342.
  75. assert_(e.errno is None)
  76. def test_InvalidHTTPCacheURLError(self):
  77. assert_raises(URLError, self.ds._cache, invalid_httpurl())
  78. def test_ValidFile(self):
  79. local_file = valid_textfile(self.tmpdir)
  80. fh = self.ds.open(local_file)
  81. assert_(fh)
  82. fh.close()
  83. def test_InvalidFile(self):
  84. invalid_file = invalid_textfile(self.tmpdir)
  85. assert_raises(IOError, self.ds.open, invalid_file)
  86. def test_ValidGzipFile(self):
  87. try:
  88. import gzip
  89. except ImportError:
  90. # We don't have the gzip capabilities to test.
  91. pytest.skip()
  92. # Test datasource's internal file_opener for Gzip files.
  93. filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
  94. fp = gzip.open(filepath, 'w')
  95. fp.write(magic_line)
  96. fp.close()
  97. fp = self.ds.open(filepath)
  98. result = fp.readline()
  99. fp.close()
  100. assert_equal(magic_line, result)
  101. def test_ValidBz2File(self):
  102. try:
  103. import bz2
  104. except ImportError:
  105. # We don't have the bz2 capabilities to test.
  106. pytest.skip()
  107. # Test datasource's internal file_opener for BZip2 files.
  108. filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
  109. fp = bz2.BZ2File(filepath, 'w')
  110. fp.write(magic_line)
  111. fp.close()
  112. fp = self.ds.open(filepath)
  113. result = fp.readline()
  114. fp.close()
  115. assert_equal(magic_line, result)
  116. class TestDataSourceExists:
  117. def setup(self):
  118. self.tmpdir = mkdtemp()
  119. self.ds = datasource.DataSource(self.tmpdir)
  120. def teardown(self):
  121. rmtree(self.tmpdir)
  122. del self.ds
  123. def test_ValidHTTP(self):
  124. assert_(self.ds.exists(valid_httpurl()))
  125. def test_InvalidHTTP(self):
  126. assert_equal(self.ds.exists(invalid_httpurl()), False)
  127. def test_ValidFile(self):
  128. # Test valid file in destpath
  129. tmpfile = valid_textfile(self.tmpdir)
  130. assert_(self.ds.exists(tmpfile))
  131. # Test valid local file not in destpath
  132. localdir = mkdtemp()
  133. tmpfile = valid_textfile(localdir)
  134. assert_(self.ds.exists(tmpfile))
  135. rmtree(localdir)
  136. def test_InvalidFile(self):
  137. tmpfile = invalid_textfile(self.tmpdir)
  138. assert_equal(self.ds.exists(tmpfile), False)
  139. class TestDataSourceAbspath:
  140. def setup(self):
  141. self.tmpdir = os.path.abspath(mkdtemp())
  142. self.ds = datasource.DataSource(self.tmpdir)
  143. def teardown(self):
  144. rmtree(self.tmpdir)
  145. del self.ds
  146. def test_ValidHTTP(self):
  147. scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
  148. local_path = os.path.join(self.tmpdir, netloc,
  149. upath.strip(os.sep).strip('/'))
  150. assert_equal(local_path, self.ds.abspath(valid_httpurl()))
  151. def test_ValidFile(self):
  152. tmpfile = valid_textfile(self.tmpdir)
  153. tmpfilename = os.path.split(tmpfile)[-1]
  154. # Test with filename only
  155. assert_equal(tmpfile, self.ds.abspath(tmpfilename))
  156. # Test filename with complete path
  157. assert_equal(tmpfile, self.ds.abspath(tmpfile))
  158. def test_InvalidHTTP(self):
  159. scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
  160. invalidhttp = os.path.join(self.tmpdir, netloc,
  161. upath.strip(os.sep).strip('/'))
  162. assert_(invalidhttp != self.ds.abspath(valid_httpurl()))
  163. def test_InvalidFile(self):
  164. invalidfile = valid_textfile(self.tmpdir)
  165. tmpfile = valid_textfile(self.tmpdir)
  166. tmpfilename = os.path.split(tmpfile)[-1]
  167. # Test with filename only
  168. assert_(invalidfile != self.ds.abspath(tmpfilename))
  169. # Test filename with complete path
  170. assert_(invalidfile != self.ds.abspath(tmpfile))
  171. def test_sandboxing(self):
  172. tmpfile = valid_textfile(self.tmpdir)
  173. tmpfilename = os.path.split(tmpfile)[-1]
  174. tmp_path = lambda x: os.path.abspath(self.ds.abspath(x))
  175. assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir))
  176. assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir))
  177. assert_(tmp_path(tmpfile).startswith(self.tmpdir))
  178. assert_(tmp_path(tmpfilename).startswith(self.tmpdir))
  179. for fn in malicious_files:
  180. assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
  181. assert_(tmp_path(fn).startswith(self.tmpdir))
  182. def test_windows_os_sep(self):
  183. orig_os_sep = os.sep
  184. try:
  185. os.sep = '\\'
  186. self.test_ValidHTTP()
  187. self.test_ValidFile()
  188. self.test_InvalidHTTP()
  189. self.test_InvalidFile()
  190. self.test_sandboxing()
  191. finally:
  192. os.sep = orig_os_sep
  193. class TestRepositoryAbspath:
  194. def setup(self):
  195. self.tmpdir = os.path.abspath(mkdtemp())
  196. self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
  197. def teardown(self):
  198. rmtree(self.tmpdir)
  199. del self.repos
  200. def test_ValidHTTP(self):
  201. scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
  202. local_path = os.path.join(self.repos._destpath, netloc,
  203. upath.strip(os.sep).strip('/'))
  204. filepath = self.repos.abspath(valid_httpfile())
  205. assert_equal(local_path, filepath)
  206. def test_sandboxing(self):
  207. tmp_path = lambda x: os.path.abspath(self.repos.abspath(x))
  208. assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir))
  209. for fn in malicious_files:
  210. assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
  211. assert_(tmp_path(fn).startswith(self.tmpdir))
  212. def test_windows_os_sep(self):
  213. orig_os_sep = os.sep
  214. try:
  215. os.sep = '\\'
  216. self.test_ValidHTTP()
  217. self.test_sandboxing()
  218. finally:
  219. os.sep = orig_os_sep
  220. class TestRepositoryExists:
  221. def setup(self):
  222. self.tmpdir = mkdtemp()
  223. self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
  224. def teardown(self):
  225. rmtree(self.tmpdir)
  226. del self.repos
  227. def test_ValidFile(self):
  228. # Create local temp file
  229. tmpfile = valid_textfile(self.tmpdir)
  230. assert_(self.repos.exists(tmpfile))
  231. def test_InvalidFile(self):
  232. tmpfile = invalid_textfile(self.tmpdir)
  233. assert_equal(self.repos.exists(tmpfile), False)
  234. def test_RemoveHTTPFile(self):
  235. assert_(self.repos.exists(valid_httpurl()))
  236. def test_CachedHTTPFile(self):
  237. localfile = valid_httpurl()
  238. # Create a locally cached temp file with an URL based
  239. # directory structure. This is similar to what Repository.open
  240. # would do.
  241. scheme, netloc, upath, pms, qry, frg = urlparse(localfile)
  242. local_path = os.path.join(self.repos._destpath, netloc)
  243. os.mkdir(local_path, 0o0700)
  244. tmpfile = valid_textfile(local_path)
  245. assert_(self.repos.exists(tmpfile))
  246. class TestOpenFunc:
  247. def setup(self):
  248. self.tmpdir = mkdtemp()
  249. def teardown(self):
  250. rmtree(self.tmpdir)
  251. def test_DataSourceOpen(self):
  252. local_file = valid_textfile(self.tmpdir)
  253. # Test case where destpath is passed in
  254. fp = datasource.open(local_file, destpath=self.tmpdir)
  255. assert_(fp)
  256. fp.close()
  257. # Test case where default destpath is used
  258. fp = datasource.open(local_file)
  259. assert_(fp)
  260. fp.close()
  261. def test_del_attr_handling():
  262. # DataSource __del__ can be called
  263. # even if __init__ fails when the
  264. # Exception object is caught by the
  265. # caller as happens in refguide_check
  266. # is_deprecated() function
  267. ds = datasource.DataSource()
  268. # simulate failed __init__ by removing key attribute
  269. # produced within __init__ and expected by __del__
  270. del ds._istmpdest
  271. # should not raise an AttributeError if __del__
  272. # gracefully handles failed __init__:
  273. ds.__del__()