图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
11 KiB

  1. import base64
  2. import datetime
  3. import ssl
  4. from urllib.parse import urljoin, urlparse
  5. import cryptography.hazmat.primitives.hashes
  6. import requests
  7. from cryptography import hazmat, x509
  8. from cryptography.exceptions import InvalidSignature
  9. from cryptography.hazmat import backends
  10. from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
  11. from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey
  12. from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
  13. from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
  14. from cryptography.hazmat.primitives.hashes import SHA1, Hash
  15. from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
  16. from cryptography.x509 import ocsp
  17. from redis.exceptions import AuthorizationError, ConnectionError
  18. def _verify_response(issuer_cert, ocsp_response):
  19. pubkey = issuer_cert.public_key()
  20. try:
  21. if isinstance(pubkey, RSAPublicKey):
  22. pubkey.verify(
  23. ocsp_response.signature,
  24. ocsp_response.tbs_response_bytes,
  25. PKCS1v15(),
  26. ocsp_response.signature_hash_algorithm,
  27. )
  28. elif isinstance(pubkey, DSAPublicKey):
  29. pubkey.verify(
  30. ocsp_response.signature,
  31. ocsp_response.tbs_response_bytes,
  32. ocsp_response.signature_hash_algorithm,
  33. )
  34. elif isinstance(pubkey, EllipticCurvePublicKey):
  35. pubkey.verify(
  36. ocsp_response.signature,
  37. ocsp_response.tbs_response_bytes,
  38. ECDSA(ocsp_response.signature_hash_algorithm),
  39. )
  40. else:
  41. pubkey.verify(ocsp_response.signature, ocsp_response.tbs_response_bytes)
  42. except InvalidSignature:
  43. raise ConnectionError("failed to valid ocsp response")
  44. def _check_certificate(issuer_cert, ocsp_bytes, validate=True):
  45. """A wrapper the return the validity of a known ocsp certificate"""
  46. ocsp_response = ocsp.load_der_ocsp_response(ocsp_bytes)
  47. if ocsp_response.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED:
  48. raise AuthorizationError("you are not authorized to view this ocsp certificate")
  49. if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
  50. if ocsp_response.certificate_status != ocsp.OCSPCertStatus.GOOD:
  51. raise ConnectionError(
  52. f'Received an {str(ocsp_response.certificate_status).split(".")[1]} '
  53. "ocsp certificate status"
  54. )
  55. else:
  56. raise ConnectionError(
  57. "failed to retrieve a successful response from the ocsp responder"
  58. )
  59. if ocsp_response.this_update >= datetime.datetime.now():
  60. raise ConnectionError("ocsp certificate was issued in the future")
  61. if (
  62. ocsp_response.next_update
  63. and ocsp_response.next_update < datetime.datetime.now()
  64. ):
  65. raise ConnectionError("ocsp certificate has invalid update - in the past")
  66. responder_name = ocsp_response.responder_name
  67. issuer_hash = ocsp_response.issuer_key_hash
  68. responder_hash = ocsp_response.responder_key_hash
  69. cert_to_validate = issuer_cert
  70. if (
  71. responder_name is not None
  72. and responder_name == issuer_cert.subject
  73. or responder_hash == issuer_hash
  74. ):
  75. cert_to_validate = issuer_cert
  76. else:
  77. certs = ocsp_response.certificates
  78. responder_certs = _get_certificates(
  79. certs, issuer_cert, responder_name, responder_hash
  80. )
  81. try:
  82. responder_cert = responder_certs[0]
  83. except IndexError:
  84. raise ConnectionError("no certificates found for the responder")
  85. ext = responder_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
  86. if ext is None or x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING not in ext.value:
  87. raise ConnectionError("delegate not autorized for ocsp signing")
  88. cert_to_validate = responder_cert
  89. if validate:
  90. _verify_response(cert_to_validate, ocsp_response)
  91. return True
  92. def _get_certificates(certs, issuer_cert, responder_name, responder_hash):
  93. if responder_name is None:
  94. certificates = [
  95. c
  96. for c in certs
  97. if _get_pubkey_hash(c) == responder_hash and c.issuer == issuer_cert.subject
  98. ]
  99. else:
  100. certificates = [
  101. c
  102. for c in certs
  103. if c.subject == responder_name and c.issuer == issuer_cert.subject
  104. ]
  105. return certificates
  106. def _get_pubkey_hash(certificate):
  107. pubkey = certificate.public_key()
  108. # https://stackoverflow.com/a/46309453/600498
  109. if isinstance(pubkey, RSAPublicKey):
  110. h = pubkey.public_bytes(Encoding.DER, PublicFormat.PKCS1)
  111. elif isinstance(pubkey, EllipticCurvePublicKey):
  112. h = pubkey.public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
  113. else:
  114. h = pubkey.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
  115. sha1 = Hash(SHA1(), backend=backends.default_backend())
  116. sha1.update(h)
  117. return sha1.finalize()
  118. def ocsp_staple_verifier(con, ocsp_bytes, expected=None):
  119. """An implementation of a function for set_ocsp_client_callback in PyOpenSSL.
  120. This function validates that the provide ocsp_bytes response is valid,
  121. and matches the expected, stapled responses.
  122. """
  123. if ocsp_bytes in [b"", None]:
  124. raise ConnectionError("no ocsp response present")
  125. issuer_cert = None
  126. peer_cert = con.get_peer_certificate().to_cryptography()
  127. for c in con.get_peer_cert_chain():
  128. cert = c.to_cryptography()
  129. if cert.subject == peer_cert.issuer:
  130. issuer_cert = cert
  131. break
  132. if issuer_cert is None:
  133. raise ConnectionError("no matching issuer cert found in certificate chain")
  134. if expected is not None:
  135. e = x509.load_pem_x509_certificate(expected)
  136. if peer_cert != e:
  137. raise ConnectionError("received and expected certificates do not match")
  138. return _check_certificate(issuer_cert, ocsp_bytes)
  139. class OCSPVerifier:
  140. """A class to verify ssl sockets for RFC6960/RFC6961. This can be used
  141. when using direct validation of OCSP responses and certificate revocations.
  142. @see https://datatracker.ietf.org/doc/html/rfc6960
  143. @see https://datatracker.ietf.org/doc/html/rfc6961
  144. """
  145. def __init__(self, sock, host, port, ca_certs=None):
  146. self.SOCK = sock
  147. self.HOST = host
  148. self.PORT = port
  149. self.CA_CERTS = ca_certs
  150. def _bin2ascii(self, der):
  151. """Convert SSL certificates in a binary (DER) format to ASCII PEM."""
  152. pem = ssl.DER_cert_to_PEM_cert(der)
  153. cert = x509.load_pem_x509_certificate(pem.encode(), backends.default_backend())
  154. return cert
  155. def components_from_socket(self):
  156. """This function returns the certificate, primary issuer, and primary ocsp
  157. server in the chain for a socket already wrapped with ssl.
  158. """
  159. # convert the binary certifcate to text
  160. der = self.SOCK.getpeercert(True)
  161. if der is False:
  162. raise ConnectionError("no certificate found for ssl peer")
  163. cert = self._bin2ascii(der)
  164. return self._certificate_components(cert)
  165. def _certificate_components(self, cert):
  166. """Given an SSL certificate, retract the useful components for
  167. validating the certificate status with an OCSP server.
  168. Args:
  169. cert ([bytes]): A PEM encoded ssl certificate
  170. """
  171. try:
  172. aia = cert.extensions.get_extension_for_oid(
  173. x509.oid.ExtensionOID.AUTHORITY_INFORMATION_ACCESS
  174. ).value
  175. except cryptography.x509.extensions.ExtensionNotFound:
  176. raise ConnectionError("No AIA information present in ssl certificate")
  177. # fetch certificate issuers
  178. issuers = [
  179. i
  180. for i in aia
  181. if i.access_method == x509.oid.AuthorityInformationAccessOID.CA_ISSUERS
  182. ]
  183. try:
  184. issuer = issuers[0].access_location.value
  185. except IndexError:
  186. issuer = None
  187. # now, the series of ocsp server entries
  188. ocsps = [
  189. i
  190. for i in aia
  191. if i.access_method == x509.oid.AuthorityInformationAccessOID.OCSP
  192. ]
  193. try:
  194. ocsp = ocsps[0].access_location.value
  195. except IndexError:
  196. raise ConnectionError("no ocsp servers in certificate")
  197. return cert, issuer, ocsp
  198. def components_from_direct_connection(self):
  199. """Return the certificate, primary issuer, and primary ocsp server
  200. from the host defined by the socket. This is useful in cases where
  201. different certificates are occasionally presented.
  202. """
  203. pem = ssl.get_server_certificate((self.HOST, self.PORT), ca_certs=self.CA_CERTS)
  204. cert = x509.load_pem_x509_certificate(pem.encode(), backends.default_backend())
  205. return self._certificate_components(cert)
  206. def build_certificate_url(self, server, cert, issuer_cert):
  207. """Return the complete url to the ocsp"""
  208. orb = ocsp.OCSPRequestBuilder()
  209. # add_certificate returns an initialized OCSPRequestBuilder
  210. orb = orb.add_certificate(
  211. cert, issuer_cert, cryptography.hazmat.primitives.hashes.SHA256()
  212. )
  213. request = orb.build()
  214. path = base64.b64encode(
  215. request.public_bytes(hazmat.primitives.serialization.Encoding.DER)
  216. )
  217. url = urljoin(server, path.decode("ascii"))
  218. return url
  219. def check_certificate(self, server, cert, issuer_url):
  220. """Checks the validity of an ocsp server for an issuer"""
  221. r = requests.get(issuer_url)
  222. if not r.ok:
  223. raise ConnectionError("failed to fetch issuer certificate")
  224. der = r.content
  225. issuer_cert = self._bin2ascii(der)
  226. ocsp_url = self.build_certificate_url(server, cert, issuer_cert)
  227. # HTTP 1.1 mandates the addition of the Host header in ocsp responses
  228. header = {
  229. "Host": urlparse(ocsp_url).netloc,
  230. "Content-Type": "application/ocsp-request",
  231. }
  232. r = requests.get(ocsp_url, headers=header)
  233. if not r.ok:
  234. raise ConnectionError("failed to fetch ocsp certificate")
  235. return _check_certificate(issuer_cert, r.content, True)
  236. def is_valid(self):
  237. """Returns the validity of the certificate wrapping our socket.
  238. This first retrieves for validate the certificate, issuer_url,
  239. and ocsp_server for certificate validate. Then retrieves the
  240. issuer certificate from the issuer_url, and finally checks
  241. the validity of OCSP revocation status.
  242. """
  243. # validate the certificate
  244. try:
  245. cert, issuer_url, ocsp_server = self.components_from_socket()
  246. if issuer_url is None:
  247. raise ConnectionError("no issuers found in certificate chain")
  248. return self.check_certificate(ocsp_server, cert, issuer_url)
  249. except AuthorizationError:
  250. cert, issuer_url, ocsp_server = self.components_from_direct_connection()
  251. if issuer_url is None:
  252. raise ConnectionError("no issuers found in certificate chain")
  253. return self.check_certificate(ocsp_server, cert, issuer_url)