图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
6.7 KiB

  1. """ PEP 610 """
  2. import json
  3. import re
  4. import urllib.parse
  5. from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
  6. __all__ = [
  7. "DirectUrl",
  8. "DirectUrlValidationError",
  9. "DirInfo",
  10. "ArchiveInfo",
  11. "VcsInfo",
  12. ]
  13. T = TypeVar("T")
  14. DIRECT_URL_METADATA_NAME = "direct_url.json"
  15. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  16. class DirectUrlValidationError(Exception):
  17. pass
  18. def _get(
  19. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  20. ) -> Optional[T]:
  21. """Get value from dictionary and verify expected type."""
  22. if key not in d:
  23. return default
  24. value = d[key]
  25. if not isinstance(value, expected_type):
  26. raise DirectUrlValidationError(
  27. f"{value!r} has unexpected type for {key} (expected {expected_type})"
  28. )
  29. return value
  30. def _get_required(
  31. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  32. ) -> T:
  33. value = _get(d, expected_type, key, default)
  34. if value is None:
  35. raise DirectUrlValidationError(f"{key} must have a value")
  36. return value
  37. def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
  38. infos = [info for info in infos if info is not None]
  39. if not infos:
  40. raise DirectUrlValidationError(
  41. "missing one of archive_info, dir_info, vcs_info"
  42. )
  43. if len(infos) > 1:
  44. raise DirectUrlValidationError(
  45. "more than one of archive_info, dir_info, vcs_info"
  46. )
  47. assert infos[0] is not None
  48. return infos[0]
  49. def _filter_none(**kwargs: Any) -> Dict[str, Any]:
  50. """Make dict excluding None values."""
  51. return {k: v for k, v in kwargs.items() if v is not None}
  52. class VcsInfo:
  53. name = "vcs_info"
  54. def __init__(
  55. self,
  56. vcs: str,
  57. commit_id: str,
  58. requested_revision: Optional[str] = None,
  59. ) -> None:
  60. self.vcs = vcs
  61. self.requested_revision = requested_revision
  62. self.commit_id = commit_id
  63. @classmethod
  64. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
  65. if d is None:
  66. return None
  67. return cls(
  68. vcs=_get_required(d, str, "vcs"),
  69. commit_id=_get_required(d, str, "commit_id"),
  70. requested_revision=_get(d, str, "requested_revision"),
  71. )
  72. def _to_dict(self) -> Dict[str, Any]:
  73. return _filter_none(
  74. vcs=self.vcs,
  75. requested_revision=self.requested_revision,
  76. commit_id=self.commit_id,
  77. )
  78. class ArchiveInfo:
  79. name = "archive_info"
  80. def __init__(
  81. self,
  82. hash: Optional[str] = None,
  83. hashes: Optional[Dict[str, str]] = None,
  84. ) -> None:
  85. # set hashes before hash, since the hash setter will further populate hashes
  86. self.hashes = hashes
  87. self.hash = hash
  88. @property
  89. def hash(self) -> Optional[str]:
  90. return self._hash
  91. @hash.setter
  92. def hash(self, value: Optional[str]) -> None:
  93. if value is not None:
  94. # Auto-populate the hashes key to upgrade to the new format automatically.
  95. # We don't back-populate the legacy hash key from hashes.
  96. try:
  97. hash_name, hash_value = value.split("=", 1)
  98. except ValueError:
  99. raise DirectUrlValidationError(
  100. f"invalid archive_info.hash format: {value!r}"
  101. )
  102. if self.hashes is None:
  103. self.hashes = {hash_name: hash_value}
  104. elif hash_name not in self.hashes:
  105. self.hashes = self.hashes.copy()
  106. self.hashes[hash_name] = hash_value
  107. self._hash = value
  108. @classmethod
  109. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
  110. if d is None:
  111. return None
  112. return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
  113. def _to_dict(self) -> Dict[str, Any]:
  114. return _filter_none(hash=self.hash, hashes=self.hashes)
  115. class DirInfo:
  116. name = "dir_info"
  117. def __init__(
  118. self,
  119. editable: bool = False,
  120. ) -> None:
  121. self.editable = editable
  122. @classmethod
  123. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
  124. if d is None:
  125. return None
  126. return cls(editable=_get_required(d, bool, "editable", default=False))
  127. def _to_dict(self) -> Dict[str, Any]:
  128. return _filter_none(editable=self.editable or None)
  129. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  130. class DirectUrl:
  131. def __init__(
  132. self,
  133. url: str,
  134. info: InfoType,
  135. subdirectory: Optional[str] = None,
  136. ) -> None:
  137. self.url = url
  138. self.info = info
  139. self.subdirectory = subdirectory
  140. def _remove_auth_from_netloc(self, netloc: str) -> str:
  141. if "@" not in netloc:
  142. return netloc
  143. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  144. if (
  145. isinstance(self.info, VcsInfo)
  146. and self.info.vcs == "git"
  147. and user_pass == "git"
  148. ):
  149. return netloc
  150. if ENV_VAR_RE.match(user_pass):
  151. return netloc
  152. return netloc_no_user_pass
  153. @property
  154. def redacted_url(self) -> str:
  155. """url with user:password part removed unless it is formed with
  156. environment variables as specified in PEP 610, or it is ``git``
  157. in the case of a git URL.
  158. """
  159. purl = urllib.parse.urlsplit(self.url)
  160. netloc = self._remove_auth_from_netloc(purl.netloc)
  161. surl = urllib.parse.urlunsplit(
  162. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  163. )
  164. return surl
  165. def validate(self) -> None:
  166. self.from_dict(self.to_dict())
  167. @classmethod
  168. def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
  169. return DirectUrl(
  170. url=_get_required(d, str, "url"),
  171. subdirectory=_get(d, str, "subdirectory"),
  172. info=_exactly_one_of(
  173. [
  174. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  175. DirInfo._from_dict(_get(d, dict, "dir_info")),
  176. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  177. ]
  178. ),
  179. )
  180. def to_dict(self) -> Dict[str, Any]:
  181. res = _filter_none(
  182. url=self.redacted_url,
  183. subdirectory=self.subdirectory,
  184. )
  185. res[self.info.name] = self.info._to_dict()
  186. return res
  187. @classmethod
  188. def from_json(cls, s: str) -> "DirectUrl":
  189. return cls.from_dict(json.loads(s))
  190. def to_json(self) -> str:
  191. return json.dumps(self.to_dict(), sort_keys=True)
  192. def is_local_editable(self) -> bool:
  193. return isinstance(self.info, DirInfo) and self.info.editable