图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

381 lines
14 KiB

  1. import asyncio
  2. import random
  3. import weakref
  4. from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
  5. from redis.asyncio.client import Redis
  6. from redis.asyncio.connection import (
  7. Connection,
  8. ConnectionPool,
  9. EncodableT,
  10. SSLConnection,
  11. )
  12. from redis.commands import AsyncSentinelCommands
  13. from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
  14. from redis.utils import str_if_bytes
  15. class MasterNotFoundError(ConnectionError):
  16. pass
  17. class SlaveNotFoundError(ConnectionError):
  18. pass
  19. class SentinelManagedConnection(Connection):
  20. def __init__(self, **kwargs):
  21. self.connection_pool = kwargs.pop("connection_pool")
  22. super().__init__(**kwargs)
  23. def __repr__(self):
  24. pool = self.connection_pool
  25. s = (
  26. f"<{self.__class__.__module__}.{self.__class__.__name__}"
  27. f"(service={pool.service_name}"
  28. )
  29. if self.host:
  30. host_info = f",host={self.host},port={self.port}"
  31. s += host_info
  32. return s + ")>"
  33. async def connect_to(self, address):
  34. self.host, self.port = address
  35. await super().connect()
  36. if self.connection_pool.check_connection:
  37. await self.send_command("PING")
  38. if str_if_bytes(await self.read_response()) != "PONG":
  39. raise ConnectionError("PING failed")
  40. async def _connect_retry(self):
  41. if self._reader:
  42. return # already connected
  43. if self.connection_pool.is_master:
  44. await self.connect_to(await self.connection_pool.get_master_address())
  45. else:
  46. async for slave in self.connection_pool.rotate_slaves():
  47. try:
  48. return await self.connect_to(slave)
  49. except ConnectionError:
  50. continue
  51. raise SlaveNotFoundError # Never be here
  52. async def connect(self):
  53. return await self.retry.call_with_retry(
  54. self._connect_retry,
  55. lambda error: asyncio.sleep(0),
  56. )
  57. async def read_response(
  58. self,
  59. disable_decoding: bool = False,
  60. timeout: Optional[float] = None,
  61. *,
  62. disconnect_on_error: Optional[float] = True,
  63. push_request: Optional[bool] = False,
  64. ):
  65. try:
  66. return await super().read_response(
  67. disable_decoding=disable_decoding,
  68. timeout=timeout,
  69. disconnect_on_error=disconnect_on_error,
  70. push_request=push_request,
  71. )
  72. except ReadOnlyError:
  73. if self.connection_pool.is_master:
  74. # When talking to a master, a ReadOnlyError when likely
  75. # indicates that the previous master that we're still connected
  76. # to has been demoted to a slave and there's a new master.
  77. # calling disconnect will force the connection to re-query
  78. # sentinel during the next connect() attempt.
  79. await self.disconnect()
  80. raise ConnectionError("The previous master is now a slave")
  81. raise
  82. class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
  83. pass
  84. class SentinelConnectionPool(ConnectionPool):
  85. """
  86. Sentinel backed connection pool.
  87. If ``check_connection`` flag is set to True, SentinelManagedConnection
  88. sends a PING command right after establishing the connection.
  89. """
  90. def __init__(self, service_name, sentinel_manager, **kwargs):
  91. kwargs["connection_class"] = kwargs.get(
  92. "connection_class",
  93. SentinelManagedSSLConnection
  94. if kwargs.pop("ssl", False)
  95. else SentinelManagedConnection,
  96. )
  97. self.is_master = kwargs.pop("is_master", True)
  98. self.check_connection = kwargs.pop("check_connection", False)
  99. super().__init__(**kwargs)
  100. self.connection_kwargs["connection_pool"] = weakref.proxy(self)
  101. self.service_name = service_name
  102. self.sentinel_manager = sentinel_manager
  103. self.master_address = None
  104. self.slave_rr_counter = None
  105. def __repr__(self):
  106. return (
  107. f"<{self.__class__.__module__}.{self.__class__.__name__}"
  108. f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
  109. )
  110. def reset(self):
  111. super().reset()
  112. self.master_address = None
  113. self.slave_rr_counter = None
  114. def owns_connection(self, connection: Connection):
  115. check = not self.is_master or (
  116. self.is_master and self.master_address == (connection.host, connection.port)
  117. )
  118. return check and super().owns_connection(connection)
  119. async def get_master_address(self):
  120. master_address = await self.sentinel_manager.discover_master(self.service_name)
  121. if self.is_master:
  122. if self.master_address != master_address:
  123. self.master_address = master_address
  124. # disconnect any idle connections so that they reconnect
  125. # to the new master the next time that they are used.
  126. await self.disconnect(inuse_connections=False)
  127. return master_address
  128. async def rotate_slaves(self) -> AsyncIterator:
  129. """Round-robin slave balancer"""
  130. slaves = await self.sentinel_manager.discover_slaves(self.service_name)
  131. if slaves:
  132. if self.slave_rr_counter is None:
  133. self.slave_rr_counter = random.randint(0, len(slaves) - 1)
  134. for _ in range(len(slaves)):
  135. self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
  136. slave = slaves[self.slave_rr_counter]
  137. yield slave
  138. # Fallback to the master connection
  139. try:
  140. yield await self.get_master_address()
  141. except MasterNotFoundError:
  142. pass
  143. raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
  144. class Sentinel(AsyncSentinelCommands):
  145. """
  146. Redis Sentinel cluster client
  147. >>> from redis.sentinel import Sentinel
  148. >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
  149. >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
  150. >>> await master.set('foo', 'bar')
  151. >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
  152. >>> await slave.get('foo')
  153. b'bar'
  154. ``sentinels`` is a list of sentinel nodes. Each node is represented by
  155. a pair (hostname, port).
  156. ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
  157. When querying a sentinel, if it doesn't meet this threshold, responses
  158. from that sentinel won't be considered valid.
  159. ``sentinel_kwargs`` is a dictionary of connection arguments used when
  160. connecting to sentinel instances. Any argument that can be passed to
  161. a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
  162. not specified, any socket_timeout and socket_keepalive options specified
  163. in ``connection_kwargs`` will be used.
  164. ``connection_kwargs`` are keyword arguments that will be used when
  165. establishing a connection to a Redis server.
  166. """
  167. def __init__(
  168. self,
  169. sentinels,
  170. min_other_sentinels=0,
  171. sentinel_kwargs=None,
  172. **connection_kwargs,
  173. ):
  174. # if sentinel_kwargs isn't defined, use the socket_* options from
  175. # connection_kwargs
  176. if sentinel_kwargs is None:
  177. sentinel_kwargs = {
  178. k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
  179. }
  180. self.sentinel_kwargs = sentinel_kwargs
  181. self.sentinels = [
  182. Redis(host=hostname, port=port, **self.sentinel_kwargs)
  183. for hostname, port in sentinels
  184. ]
  185. self.min_other_sentinels = min_other_sentinels
  186. self.connection_kwargs = connection_kwargs
  187. async def execute_command(self, *args, **kwargs):
  188. """
  189. Execute Sentinel command in sentinel nodes.
  190. once - If set to True, then execute the resulting command on a single
  191. node at random, rather than across the entire sentinel cluster.
  192. """
  193. once = bool(kwargs.get("once", False))
  194. if "once" in kwargs.keys():
  195. kwargs.pop("once")
  196. if once:
  197. await random.choice(self.sentinels).execute_command(*args, **kwargs)
  198. else:
  199. tasks = [
  200. asyncio.Task(sentinel.execute_command(*args, **kwargs))
  201. for sentinel in self.sentinels
  202. ]
  203. await asyncio.gather(*tasks)
  204. return True
  205. def __repr__(self):
  206. sentinel_addresses = []
  207. for sentinel in self.sentinels:
  208. sentinel_addresses.append(
  209. f"{sentinel.connection_pool.connection_kwargs['host']}:"
  210. f"{sentinel.connection_pool.connection_kwargs['port']}"
  211. )
  212. return (
  213. f"<{self.__class__}.{self.__class__.__name__}"
  214. f"(sentinels=[{','.join(sentinel_addresses)}])>"
  215. )
  216. def check_master_state(self, state: dict, service_name: str) -> bool:
  217. if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
  218. return False
  219. # Check if our sentinel doesn't see other nodes
  220. if state["num-other-sentinels"] < self.min_other_sentinels:
  221. return False
  222. return True
  223. async def discover_master(self, service_name: str):
  224. """
  225. Asks sentinel servers for the Redis master's address corresponding
  226. to the service labeled ``service_name``.
  227. Returns a pair (address, port) or raises MasterNotFoundError if no
  228. master is found.
  229. """
  230. collected_errors = list()
  231. for sentinel_no, sentinel in enumerate(self.sentinels):
  232. try:
  233. masters = await sentinel.sentinel_masters()
  234. except (ConnectionError, TimeoutError) as e:
  235. collected_errors.append(f"{sentinel} - {e!r}")
  236. continue
  237. state = masters.get(service_name)
  238. if state and self.check_master_state(state, service_name):
  239. # Put this sentinel at the top of the list
  240. self.sentinels[0], self.sentinels[sentinel_no] = (
  241. sentinel,
  242. self.sentinels[0],
  243. )
  244. return state["ip"], state["port"]
  245. error_info = ""
  246. if len(collected_errors) > 0:
  247. error_info = f" : {', '.join(collected_errors)}"
  248. raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
  249. def filter_slaves(
  250. self, slaves: Iterable[Mapping]
  251. ) -> Sequence[Tuple[EncodableT, EncodableT]]:
  252. """Remove slaves that are in an ODOWN or SDOWN state"""
  253. slaves_alive = []
  254. for slave in slaves:
  255. if slave["is_odown"] or slave["is_sdown"]:
  256. continue
  257. slaves_alive.append((slave["ip"], slave["port"]))
  258. return slaves_alive
  259. async def discover_slaves(
  260. self, service_name: str
  261. ) -> Sequence[Tuple[EncodableT, EncodableT]]:
  262. """Returns a list of alive slaves for service ``service_name``"""
  263. for sentinel in self.sentinels:
  264. try:
  265. slaves = await sentinel.sentinel_slaves(service_name)
  266. except (ConnectionError, ResponseError, TimeoutError):
  267. continue
  268. slaves = self.filter_slaves(slaves)
  269. if slaves:
  270. return slaves
  271. return []
  272. def master_for(
  273. self,
  274. service_name: str,
  275. redis_class: Type[Redis] = Redis,
  276. connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
  277. **kwargs,
  278. ):
  279. """
  280. Returns a redis client instance for the ``service_name`` master.
  281. A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
  282. used to retrieve the master's address before establishing a new
  283. connection.
  284. NOTE: If the master's address has changed, any cached connections to
  285. the old master are closed.
  286. By default clients will be a :py:class:`~redis.Redis` instance.
  287. Specify a different class to the ``redis_class`` argument if you
  288. desire something different.
  289. The ``connection_pool_class`` specifies the connection pool to
  290. use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
  291. will be used by default.
  292. All other keyword arguments are merged with any connection_kwargs
  293. passed to this class and passed to the connection pool as keyword
  294. arguments to be used to initialize Redis connections.
  295. """
  296. kwargs["is_master"] = True
  297. connection_kwargs = dict(self.connection_kwargs)
  298. connection_kwargs.update(kwargs)
  299. connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
  300. # The Redis object "owns" the pool
  301. return redis_class.from_pool(connection_pool)
  302. def slave_for(
  303. self,
  304. service_name: str,
  305. redis_class: Type[Redis] = Redis,
  306. connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
  307. **kwargs,
  308. ):
  309. """
  310. Returns redis client instance for the ``service_name`` slave(s).
  311. A SentinelConnectionPool class is used to retrieve the slave's
  312. address before establishing a new connection.
  313. By default clients will be a :py:class:`~redis.Redis` instance.
  314. Specify a different class to the ``redis_class`` argument if you
  315. desire something different.
  316. The ``connection_pool_class`` specifies the connection pool to use.
  317. The SentinelConnectionPool will be used by default.
  318. All other keyword arguments are merged with any connection_kwargs
  319. passed to this class and passed to the connection pool as keyword
  320. arguments to be used to initialize Redis connections.
  321. """
  322. kwargs["is_master"] = False
  323. connection_kwargs = dict(self.connection_kwargs)
  324. connection_kwargs.update(kwargs)
  325. connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
  326. # The Redis object "owns" the pool
  327. return redis_class.from_pool(connection_pool)