图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
12 KiB

  1. import asyncio
  2. import threading
  3. import uuid
  4. from types import SimpleNamespace
  5. from typing import TYPE_CHECKING, Awaitable, Optional, Union
  6. from redis.exceptions import LockError, LockNotOwnedError
  7. if TYPE_CHECKING:
  8. from redis.asyncio import Redis, RedisCluster
  9. class Lock:
  10. """
  11. A shared, distributed Lock. Using Redis for locking allows the Lock
  12. to be shared across processes and/or machines.
  13. It's left to the user to resolve deadlock issues and make sure
  14. multiple clients play nicely together.
  15. """
  16. lua_release = None
  17. lua_extend = None
  18. lua_reacquire = None
  19. # KEYS[1] - lock name
  20. # ARGV[1] - token
  21. # return 1 if the lock was released, otherwise 0
  22. LUA_RELEASE_SCRIPT = """
  23. local token = redis.call('get', KEYS[1])
  24. if not token or token ~= ARGV[1] then
  25. return 0
  26. end
  27. redis.call('del', KEYS[1])
  28. return 1
  29. """
  30. # KEYS[1] - lock name
  31. # ARGV[1] - token
  32. # ARGV[2] - additional milliseconds
  33. # ARGV[3] - "0" if the additional time should be added to the lock's
  34. # existing ttl or "1" if the existing ttl should be replaced
  35. # return 1 if the locks time was extended, otherwise 0
  36. LUA_EXTEND_SCRIPT = """
  37. local token = redis.call('get', KEYS[1])
  38. if not token or token ~= ARGV[1] then
  39. return 0
  40. end
  41. local expiration = redis.call('pttl', KEYS[1])
  42. if not expiration then
  43. expiration = 0
  44. end
  45. if expiration < 0 then
  46. return 0
  47. end
  48. local newttl = ARGV[2]
  49. if ARGV[3] == "0" then
  50. newttl = ARGV[2] + expiration
  51. end
  52. redis.call('pexpire', KEYS[1], newttl)
  53. return 1
  54. """
  55. # KEYS[1] - lock name
  56. # ARGV[1] - token
  57. # ARGV[2] - milliseconds
  58. # return 1 if the locks time was reacquired, otherwise 0
  59. LUA_REACQUIRE_SCRIPT = """
  60. local token = redis.call('get', KEYS[1])
  61. if not token or token ~= ARGV[1] then
  62. return 0
  63. end
  64. redis.call('pexpire', KEYS[1], ARGV[2])
  65. return 1
  66. """
  67. def __init__(
  68. self,
  69. redis: Union["Redis", "RedisCluster"],
  70. name: Union[str, bytes, memoryview],
  71. timeout: Optional[float] = None,
  72. sleep: float = 0.1,
  73. blocking: bool = True,
  74. blocking_timeout: Optional[float] = None,
  75. thread_local: bool = True,
  76. ):
  77. """
  78. Create a new Lock instance named ``name`` using the Redis client
  79. supplied by ``redis``.
  80. ``timeout`` indicates a maximum life for the lock in seconds.
  81. By default, it will remain locked until release() is called.
  82. ``timeout`` can be specified as a float or integer, both representing
  83. the number of seconds to wait.
  84. ``sleep`` indicates the amount of time to sleep in seconds per loop
  85. iteration when the lock is in blocking mode and another client is
  86. currently holding the lock.
  87. ``blocking`` indicates whether calling ``acquire`` should block until
  88. the lock has been acquired or to fail immediately, causing ``acquire``
  89. to return False and the lock not being acquired. Defaults to True.
  90. Note this value can be overridden by passing a ``blocking``
  91. argument to ``acquire``.
  92. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  93. spend trying to acquire the lock. A value of ``None`` indicates
  94. continue trying forever. ``blocking_timeout`` can be specified as a
  95. float or integer, both representing the number of seconds to wait.
  96. ``thread_local`` indicates whether the lock token is placed in
  97. thread-local storage. By default, the token is placed in thread local
  98. storage so that a thread only sees its token, not a token set by
  99. another thread. Consider the following timeline:
  100. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  101. thread-1 sets the token to "abc"
  102. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  103. Lock instance.
  104. time: 5, thread-1 has not yet completed. redis expires the lock
  105. key.
  106. time: 5, thread-2 acquired `my-lock` now that it's available.
  107. thread-2 sets the token to "xyz"
  108. time: 6, thread-1 finishes its work and calls release(). if the
  109. token is *not* stored in thread local storage, then
  110. thread-1 would see the token value as "xyz" and would be
  111. able to successfully release the thread-2's lock.
  112. In some use cases it's necessary to disable thread local storage. For
  113. example, if you have code where one thread acquires a lock and passes
  114. that lock instance to a worker thread to release later. If thread
  115. local storage isn't disabled in this case, the worker thread won't see
  116. the token set by the thread that acquired the lock. Our assumption
  117. is that these cases aren't common and as such default to using
  118. thread local storage.
  119. """
  120. self.redis = redis
  121. self.name = name
  122. self.timeout = timeout
  123. self.sleep = sleep
  124. self.blocking = blocking
  125. self.blocking_timeout = blocking_timeout
  126. self.thread_local = bool(thread_local)
  127. self.local = threading.local() if self.thread_local else SimpleNamespace()
  128. self.local.token = None
  129. self.register_scripts()
  130. def register_scripts(self):
  131. cls = self.__class__
  132. client = self.redis
  133. if cls.lua_release is None:
  134. cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
  135. if cls.lua_extend is None:
  136. cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
  137. if cls.lua_reacquire is None:
  138. cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT)
  139. async def __aenter__(self):
  140. if await self.acquire():
  141. return self
  142. raise LockError("Unable to acquire lock within the time specified")
  143. async def __aexit__(self, exc_type, exc_value, traceback):
  144. await self.release()
  145. async def acquire(
  146. self,
  147. blocking: Optional[bool] = None,
  148. blocking_timeout: Optional[float] = None,
  149. token: Optional[Union[str, bytes]] = None,
  150. ):
  151. """
  152. Use Redis to hold a shared, distributed lock named ``name``.
  153. Returns True once the lock is acquired.
  154. If ``blocking`` is False, always return immediately. If the lock
  155. was acquired, return True, otherwise return False.
  156. ``blocking_timeout`` specifies the maximum number of seconds to
  157. wait trying to acquire the lock.
  158. ``token`` specifies the token value to be used. If provided, token
  159. must be a bytes object or a string that can be encoded to a bytes
  160. object with the default encoding. If a token isn't specified, a UUID
  161. will be generated.
  162. """
  163. sleep = self.sleep
  164. if token is None:
  165. token = uuid.uuid1().hex.encode()
  166. else:
  167. try:
  168. encoder = self.redis.connection_pool.get_encoder()
  169. except AttributeError:
  170. # Cluster
  171. encoder = self.redis.get_encoder()
  172. token = encoder.encode(token)
  173. if blocking is None:
  174. blocking = self.blocking
  175. if blocking_timeout is None:
  176. blocking_timeout = self.blocking_timeout
  177. stop_trying_at = None
  178. if blocking_timeout is not None:
  179. stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout
  180. while True:
  181. if await self.do_acquire(token):
  182. self.local.token = token
  183. return True
  184. if not blocking:
  185. return False
  186. next_try_at = asyncio.get_running_loop().time() + sleep
  187. if stop_trying_at is not None and next_try_at > stop_trying_at:
  188. return False
  189. await asyncio.sleep(sleep)
  190. async def do_acquire(self, token: Union[str, bytes]) -> bool:
  191. if self.timeout:
  192. # convert to milliseconds
  193. timeout = int(self.timeout * 1000)
  194. else:
  195. timeout = None
  196. if await self.redis.set(self.name, token, nx=True, px=timeout):
  197. return True
  198. return False
  199. async def locked(self) -> bool:
  200. """
  201. Returns True if this key is locked by any process, otherwise False.
  202. """
  203. return await self.redis.get(self.name) is not None
  204. async def owned(self) -> bool:
  205. """
  206. Returns True if this key is locked by this lock, otherwise False.
  207. """
  208. stored_token = await self.redis.get(self.name)
  209. # need to always compare bytes to bytes
  210. # TODO: this can be simplified when the context manager is finished
  211. if stored_token and not isinstance(stored_token, bytes):
  212. try:
  213. encoder = self.redis.connection_pool.get_encoder()
  214. except AttributeError:
  215. # Cluster
  216. encoder = self.redis.get_encoder()
  217. stored_token = encoder.encode(stored_token)
  218. return self.local.token is not None and stored_token == self.local.token
  219. def release(self) -> Awaitable[None]:
  220. """Releases the already acquired lock"""
  221. expected_token = self.local.token
  222. if expected_token is None:
  223. raise LockError("Cannot release an unlocked lock")
  224. self.local.token = None
  225. return self.do_release(expected_token)
  226. async def do_release(self, expected_token: bytes) -> None:
  227. if not bool(
  228. await self.lua_release(
  229. keys=[self.name], args=[expected_token], client=self.redis
  230. )
  231. ):
  232. raise LockNotOwnedError("Cannot release a lock that's no longer owned")
  233. def extend(
  234. self, additional_time: float, replace_ttl: bool = False
  235. ) -> Awaitable[bool]:
  236. """
  237. Adds more time to an already acquired lock.
  238. ``additional_time`` can be specified as an integer or a float, both
  239. representing the number of seconds to add.
  240. ``replace_ttl`` if False (the default), add `additional_time` to
  241. the lock's existing ttl. If True, replace the lock's ttl with
  242. `additional_time`.
  243. """
  244. if self.local.token is None:
  245. raise LockError("Cannot extend an unlocked lock")
  246. if self.timeout is None:
  247. raise LockError("Cannot extend a lock with no timeout")
  248. return self.do_extend(additional_time, replace_ttl)
  249. async def do_extend(self, additional_time, replace_ttl) -> bool:
  250. additional_time = int(additional_time * 1000)
  251. if not bool(
  252. await self.lua_extend(
  253. keys=[self.name],
  254. args=[self.local.token, additional_time, replace_ttl and "1" or "0"],
  255. client=self.redis,
  256. )
  257. ):
  258. raise LockNotOwnedError("Cannot extend a lock that's no longer owned")
  259. return True
  260. def reacquire(self) -> Awaitable[bool]:
  261. """
  262. Resets a TTL of an already acquired lock back to a timeout value.
  263. """
  264. if self.local.token is None:
  265. raise LockError("Cannot reacquire an unlocked lock")
  266. if self.timeout is None:
  267. raise LockError("Cannot reacquire a lock with no timeout")
  268. return self.do_reacquire()
  269. async def do_reacquire(self) -> bool:
  270. timeout = int(self.timeout * 1000)
  271. if not bool(
  272. await self.lua_reacquire(
  273. keys=[self.name], args=[self.local.token, timeout], client=self.redis
  274. )
  275. ):
  276. raise LockNotOwnedError("Cannot reacquire a lock that's no longer owned")
  277. return True