图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

929 lines
31 KiB

  1. import asyncio
  2. from typing import (
  3. TYPE_CHECKING,
  4. Any,
  5. AsyncIterator,
  6. Dict,
  7. Iterable,
  8. Iterator,
  9. List,
  10. Mapping,
  11. NoReturn,
  12. Optional,
  13. Union,
  14. )
  15. from redis.compat import Literal
  16. from redis.crc import key_slot
  17. from redis.exceptions import RedisClusterException, RedisError
  18. from redis.typing import (
  19. AnyKeyT,
  20. ClusterCommandsProtocol,
  21. EncodableT,
  22. KeysT,
  23. KeyT,
  24. PatternT,
  25. )
  26. from .core import (
  27. ACLCommands,
  28. AsyncACLCommands,
  29. AsyncDataAccessCommands,
  30. AsyncFunctionCommands,
  31. AsyncGearsCommands,
  32. AsyncManagementCommands,
  33. AsyncModuleCommands,
  34. AsyncScriptCommands,
  35. DataAccessCommands,
  36. FunctionCommands,
  37. GearsCommands,
  38. ManagementCommands,
  39. ModuleCommands,
  40. PubSubCommands,
  41. ResponseT,
  42. ScriptCommands,
  43. )
  44. from .helpers import list_or_args
  45. from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
  46. if TYPE_CHECKING:
  47. from redis.asyncio.cluster import TargetNodesT
  48. # Not complete, but covers the major ones
  49. # https://redis.io/commands
  50. READ_COMMANDS = frozenset(
  51. [
  52. "BITCOUNT",
  53. "BITPOS",
  54. "EVAL_RO",
  55. "EVALSHA_RO",
  56. "EXISTS",
  57. "GEODIST",
  58. "GEOHASH",
  59. "GEOPOS",
  60. "GEORADIUS",
  61. "GEORADIUSBYMEMBER",
  62. "GET",
  63. "GETBIT",
  64. "GETRANGE",
  65. "HEXISTS",
  66. "HGET",
  67. "HGETALL",
  68. "HKEYS",
  69. "HLEN",
  70. "HMGET",
  71. "HSTRLEN",
  72. "HVALS",
  73. "KEYS",
  74. "LINDEX",
  75. "LLEN",
  76. "LRANGE",
  77. "MGET",
  78. "PTTL",
  79. "RANDOMKEY",
  80. "SCARD",
  81. "SDIFF",
  82. "SINTER",
  83. "SISMEMBER",
  84. "SMEMBERS",
  85. "SRANDMEMBER",
  86. "STRLEN",
  87. "SUNION",
  88. "TTL",
  89. "ZCARD",
  90. "ZCOUNT",
  91. "ZRANGE",
  92. "ZSCORE",
  93. ]
  94. )
  95. class ClusterMultiKeyCommands(ClusterCommandsProtocol):
  96. """
  97. A class containing commands that handle more than one key
  98. """
  99. def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
  100. """Split keys into a dictionary that maps a slot to a list of keys."""
  101. slots_to_keys = {}
  102. for key in keys:
  103. slot = key_slot(self.encoder.encode(key))
  104. slots_to_keys.setdefault(slot, []).append(key)
  105. return slots_to_keys
  106. def _partition_pairs_by_slot(
  107. self, mapping: Mapping[AnyKeyT, EncodableT]
  108. ) -> Dict[int, List[EncodableT]]:
  109. """Split pairs into a dictionary that maps a slot to a list of pairs."""
  110. slots_to_pairs = {}
  111. for pair in mapping.items():
  112. slot = key_slot(self.encoder.encode(pair[0]))
  113. slots_to_pairs.setdefault(slot, []).extend(pair)
  114. return slots_to_pairs
  115. def _execute_pipeline_by_slot(
  116. self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
  117. ) -> List[Any]:
  118. read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
  119. pipe = self.pipeline()
  120. [
  121. pipe.execute_command(
  122. command,
  123. *slot_args,
  124. target_nodes=[
  125. self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
  126. ],
  127. )
  128. for slot, slot_args in slots_to_args.items()
  129. ]
  130. return pipe.execute()
  131. def _reorder_keys_by_command(
  132. self,
  133. keys: Iterable[KeyT],
  134. slots_to_args: Mapping[int, Iterable[EncodableT]],
  135. responses: Iterable[Any],
  136. ) -> List[Any]:
  137. results = {
  138. k: v
  139. for slot_values, response in zip(slots_to_args.values(), responses)
  140. for k, v in zip(slot_values, response)
  141. }
  142. return [results[key] for key in keys]
  143. def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
  144. """
  145. Splits the keys into different slots and then calls MGET
  146. for the keys of every slot. This operation will not be atomic
  147. if keys belong to more than one slot.
  148. Returns a list of values ordered identically to ``keys``
  149. For more information see https://redis.io/commands/mget
  150. """
  151. # Concatenate all keys into a list
  152. keys = list_or_args(keys, args)
  153. # Split keys into slots
  154. slots_to_keys = self._partition_keys_by_slot(keys)
  155. # Execute commands using a pipeline
  156. res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
  157. # Reorder keys in the order the user provided & return
  158. return self._reorder_keys_by_command(keys, slots_to_keys, res)
  159. def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
  160. """
  161. Sets key/values based on a mapping. Mapping is a dictionary of
  162. key/value pairs. Both keys and values should be strings or types that
  163. can be cast to a string via str().
  164. Splits the keys into different slots and then calls MSET
  165. for the keys of every slot. This operation will not be atomic
  166. if keys belong to more than one slot.
  167. For more information see https://redis.io/commands/mset
  168. """
  169. # Partition the keys by slot
  170. slots_to_pairs = self._partition_pairs_by_slot(mapping)
  171. # Execute commands using a pipeline & return list of replies
  172. return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
  173. def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
  174. """
  175. Runs the given command once for the keys
  176. of each slot. Returns the sum of the return values.
  177. """
  178. # Partition the keys by slot
  179. slots_to_keys = self._partition_keys_by_slot(keys)
  180. # Sum up the reply from each command
  181. return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
  182. def exists(self, *keys: KeyT) -> ResponseT:
  183. """
  184. Returns the number of ``names`` that exist in the
  185. whole cluster. The keys are first split up into slots
  186. and then an EXISTS command is sent for every slot
  187. For more information see https://redis.io/commands/exists
  188. """
  189. return self._split_command_across_slots("EXISTS", *keys)
  190. def delete(self, *keys: KeyT) -> ResponseT:
  191. """
  192. Deletes the given keys in the cluster.
  193. The keys are first split up into slots
  194. and then an DEL command is sent for every slot
  195. Non-existent keys are ignored.
  196. Returns the number of keys that were deleted.
  197. For more information see https://redis.io/commands/del
  198. """
  199. return self._split_command_across_slots("DEL", *keys)
  200. def touch(self, *keys: KeyT) -> ResponseT:
  201. """
  202. Updates the last access time of given keys across the
  203. cluster.
  204. The keys are first split up into slots
  205. and then an TOUCH command is sent for every slot
  206. Non-existent keys are ignored.
  207. Returns the number of keys that were touched.
  208. For more information see https://redis.io/commands/touch
  209. """
  210. return self._split_command_across_slots("TOUCH", *keys)
  211. def unlink(self, *keys: KeyT) -> ResponseT:
  212. """
  213. Remove the specified keys in a different thread.
  214. The keys are first split up into slots
  215. and then an TOUCH command is sent for every slot
  216. Non-existent keys are ignored.
  217. Returns the number of keys that were unlinked.
  218. For more information see https://redis.io/commands/unlink
  219. """
  220. return self._split_command_across_slots("UNLINK", *keys)
  221. class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
  222. """
  223. A class containing commands that handle more than one key
  224. """
  225. async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
  226. """
  227. Splits the keys into different slots and then calls MGET
  228. for the keys of every slot. This operation will not be atomic
  229. if keys belong to more than one slot.
  230. Returns a list of values ordered identically to ``keys``
  231. For more information see https://redis.io/commands/mget
  232. """
  233. # Concatenate all keys into a list
  234. keys = list_or_args(keys, args)
  235. # Split keys into slots
  236. slots_to_keys = self._partition_keys_by_slot(keys)
  237. # Execute commands using a pipeline
  238. res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
  239. # Reorder keys in the order the user provided & return
  240. return self._reorder_keys_by_command(keys, slots_to_keys, res)
  241. async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
  242. """
  243. Sets key/values based on a mapping. Mapping is a dictionary of
  244. key/value pairs. Both keys and values should be strings or types that
  245. can be cast to a string via str().
  246. Splits the keys into different slots and then calls MSET
  247. for the keys of every slot. This operation will not be atomic
  248. if keys belong to more than one slot.
  249. For more information see https://redis.io/commands/mset
  250. """
  251. # Partition the keys by slot
  252. slots_to_pairs = self._partition_pairs_by_slot(mapping)
  253. # Execute commands using a pipeline & return list of replies
  254. return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
  255. async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
  256. """
  257. Runs the given command once for the keys
  258. of each slot. Returns the sum of the return values.
  259. """
  260. # Partition the keys by slot
  261. slots_to_keys = self._partition_keys_by_slot(keys)
  262. # Sum up the reply from each command
  263. return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
  264. async def _execute_pipeline_by_slot(
  265. self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
  266. ) -> List[Any]:
  267. if self._initialize:
  268. await self.initialize()
  269. read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
  270. pipe = self.pipeline()
  271. [
  272. pipe.execute_command(
  273. command,
  274. *slot_args,
  275. target_nodes=[
  276. self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
  277. ],
  278. )
  279. for slot, slot_args in slots_to_args.items()
  280. ]
  281. return await pipe.execute()
  282. class ClusterManagementCommands(ManagementCommands):
  283. """
  284. A class for Redis Cluster management commands
  285. The class inherits from Redis's core ManagementCommands class and do the
  286. required adjustments to work with cluster mode
  287. """
  288. def slaveof(self, *args, **kwargs) -> NoReturn:
  289. """
  290. Make the server a replica of another instance, or promote it as master.
  291. For more information see https://redis.io/commands/slaveof
  292. """
  293. raise RedisClusterException("SLAVEOF is not supported in cluster mode")
  294. def replicaof(self, *args, **kwargs) -> NoReturn:
  295. """
  296. Make the server a replica of another instance, or promote it as master.
  297. For more information see https://redis.io/commands/replicaof
  298. """
  299. raise RedisClusterException("REPLICAOF is not supported in cluster mode")
  300. def swapdb(self, *args, **kwargs) -> NoReturn:
  301. """
  302. Swaps two Redis databases.
  303. For more information see https://redis.io/commands/swapdb
  304. """
  305. raise RedisClusterException("SWAPDB is not supported in cluster mode")
  306. def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
  307. """
  308. Returns the node's id.
  309. :target_node: 'ClusterNode'
  310. The node to execute the command on
  311. For more information check https://redis.io/commands/cluster-myid/
  312. """
  313. return self.execute_command("CLUSTER MYID", target_nodes=target_node)
  314. def cluster_addslots(
  315. self, target_node: "TargetNodesT", *slots: EncodableT
  316. ) -> ResponseT:
  317. """
  318. Assign new hash slots to receiving node. Sends to specified node.
  319. :target_node: 'ClusterNode'
  320. The node to execute the command on
  321. For more information see https://redis.io/commands/cluster-addslots
  322. """
  323. return self.execute_command(
  324. "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
  325. )
  326. def cluster_addslotsrange(
  327. self, target_node: "TargetNodesT", *slots: EncodableT
  328. ) -> ResponseT:
  329. """
  330. Similar to the CLUSTER ADDSLOTS command.
  331. The difference between the two commands is that ADDSLOTS takes a list of slots
  332. to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
  333. (specified by start and end slots) to assign to the node.
  334. :target_node: 'ClusterNode'
  335. The node to execute the command on
  336. For more information see https://redis.io/commands/cluster-addslotsrange
  337. """
  338. return self.execute_command(
  339. "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
  340. )
  341. def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
  342. """
  343. Return the number of local keys in the specified hash slot
  344. Send to node based on specified slot_id
  345. For more information see https://redis.io/commands/cluster-countkeysinslot
  346. """
  347. return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
  348. def cluster_count_failure_report(self, node_id: str) -> ResponseT:
  349. """
  350. Return the number of failure reports active for a given node
  351. Sends to a random node
  352. For more information see https://redis.io/commands/cluster-count-failure-reports
  353. """
  354. return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
  355. def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
  356. """
  357. Set hash slots as unbound in the cluster.
  358. It determines by it self what node the slot is in and sends it there
  359. Returns a list of the results for each processed slot.
  360. For more information see https://redis.io/commands/cluster-delslots
  361. """
  362. return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
  363. def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
  364. """
  365. Similar to the CLUSTER DELSLOTS command.
  366. The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
  367. from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
  368. from the node.
  369. For more information see https://redis.io/commands/cluster-delslotsrange
  370. """
  371. return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
  372. def cluster_failover(
  373. self, target_node: "TargetNodesT", option: Optional[str] = None
  374. ) -> ResponseT:
  375. """
  376. Forces a slave to perform a manual failover of its master
  377. Sends to specified node
  378. :target_node: 'ClusterNode'
  379. The node to execute the command on
  380. For more information see https://redis.io/commands/cluster-failover
  381. """
  382. if option:
  383. if option.upper() not in ["FORCE", "TAKEOVER"]:
  384. raise RedisError(
  385. f"Invalid option for CLUSTER FAILOVER command: {option}"
  386. )
  387. else:
  388. return self.execute_command(
  389. "CLUSTER FAILOVER", option, target_nodes=target_node
  390. )
  391. else:
  392. return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
  393. def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  394. """
  395. Provides info about Redis Cluster node state.
  396. The command will be sent to a random node in the cluster if no target
  397. node is specified.
  398. For more information see https://redis.io/commands/cluster-info
  399. """
  400. return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
  401. def cluster_keyslot(self, key: str) -> ResponseT:
  402. """
  403. Returns the hash slot of the specified key
  404. Sends to random node in the cluster
  405. For more information see https://redis.io/commands/cluster-keyslot
  406. """
  407. return self.execute_command("CLUSTER KEYSLOT", key)
  408. def cluster_meet(
  409. self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
  410. ) -> ResponseT:
  411. """
  412. Force a node cluster to handshake with another node.
  413. Sends to specified node.
  414. For more information see https://redis.io/commands/cluster-meet
  415. """
  416. return self.execute_command(
  417. "CLUSTER MEET", host, port, target_nodes=target_nodes
  418. )
  419. def cluster_nodes(self) -> ResponseT:
  420. """
  421. Get Cluster config for the node.
  422. Sends to random node in the cluster
  423. For more information see https://redis.io/commands/cluster-nodes
  424. """
  425. return self.execute_command("CLUSTER NODES")
  426. def cluster_replicate(
  427. self, target_nodes: "TargetNodesT", node_id: str
  428. ) -> ResponseT:
  429. """
  430. Reconfigure a node as a slave of the specified master node
  431. For more information see https://redis.io/commands/cluster-replicate
  432. """
  433. return self.execute_command(
  434. "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
  435. )
  436. def cluster_reset(
  437. self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
  438. ) -> ResponseT:
  439. """
  440. Reset a Redis Cluster node
  441. If 'soft' is True then it will send 'SOFT' argument
  442. If 'soft' is False then it will send 'HARD' argument
  443. For more information see https://redis.io/commands/cluster-reset
  444. """
  445. return self.execute_command(
  446. "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
  447. )
  448. def cluster_save_config(
  449. self, target_nodes: Optional["TargetNodesT"] = None
  450. ) -> ResponseT:
  451. """
  452. Forces the node to save cluster state on disk
  453. For more information see https://redis.io/commands/cluster-saveconfig
  454. """
  455. return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
  456. def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
  457. """
  458. Returns the number of keys in the specified cluster slot
  459. For more information see https://redis.io/commands/cluster-getkeysinslot
  460. """
  461. return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
  462. def cluster_set_config_epoch(
  463. self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
  464. ) -> ResponseT:
  465. """
  466. Set the configuration epoch in a new node
  467. For more information see https://redis.io/commands/cluster-set-config-epoch
  468. """
  469. return self.execute_command(
  470. "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
  471. )
  472. def cluster_setslot(
  473. self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
  474. ) -> ResponseT:
  475. """
  476. Bind an hash slot to a specific node
  477. :target_node: 'ClusterNode'
  478. The node to execute the command on
  479. For more information see https://redis.io/commands/cluster-setslot
  480. """
  481. if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
  482. return self.execute_command(
  483. "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
  484. )
  485. elif state.upper() == "STABLE":
  486. raise RedisError('For "stable" state please use ' "cluster_setslot_stable")
  487. else:
  488. raise RedisError(f"Invalid slot state: {state}")
  489. def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
  490. """
  491. Clears migrating / importing state from the slot.
  492. It determines by it self what node the slot is in and sends it there.
  493. For more information see https://redis.io/commands/cluster-setslot
  494. """
  495. return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
  496. def cluster_replicas(
  497. self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
  498. ) -> ResponseT:
  499. """
  500. Provides a list of replica nodes replicating from the specified primary
  501. target node.
  502. For more information see https://redis.io/commands/cluster-replicas
  503. """
  504. return self.execute_command(
  505. "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
  506. )
  507. def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  508. """
  509. Get array of Cluster slot to node mappings
  510. For more information see https://redis.io/commands/cluster-slots
  511. """
  512. return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
  513. def cluster_shards(self, target_nodes=None):
  514. """
  515. Returns details about the shards of the cluster.
  516. For more information see https://redis.io/commands/cluster-shards
  517. """
  518. return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
  519. def cluster_myshardid(self, target_nodes=None):
  520. """
  521. Returns the shard ID of the node.
  522. For more information see https://redis.io/commands/cluster-myshardid/
  523. """
  524. return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
  525. def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
  526. """
  527. Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
  528. peer in the cluster: One for sending outbound messages towards the peer and one
  529. for receiving inbound messages from the peer.
  530. This command outputs information of all such peer links as an array.
  531. For more information see https://redis.io/commands/cluster-links
  532. """
  533. return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
  534. def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
  535. raise NotImplementedError(
  536. "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
  537. )
  538. def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
  539. raise NotImplementedError(
  540. "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
  541. )
  542. def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  543. """
  544. Enables read queries.
  545. The command will be sent to the default cluster node if target_nodes is
  546. not specified.
  547. For more information see https://redis.io/commands/readonly
  548. """
  549. if target_nodes == "replicas" or target_nodes == "all":
  550. # read_from_replicas will only be enabled if the READONLY command
  551. # is sent to all replicas
  552. self.read_from_replicas = True
  553. return self.execute_command("READONLY", target_nodes=target_nodes)
  554. def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  555. """
  556. Disables read queries.
  557. The command will be sent to the default cluster node if target_nodes is
  558. not specified.
  559. For more information see https://redis.io/commands/readwrite
  560. """
  561. # Reset read from replicas flag
  562. self.read_from_replicas = False
  563. return self.execute_command("READWRITE", target_nodes=target_nodes)
  564. def gears_refresh_cluster(self, **kwargs) -> ResponseT:
  565. """
  566. On an OSS cluster, before executing any gears function, you must call this command. # noqa
  567. """
  568. return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs)
  569. class AsyncClusterManagementCommands(
  570. ClusterManagementCommands, AsyncManagementCommands
  571. ):
  572. """
  573. A class for Redis Cluster management commands
  574. The class inherits from Redis's core ManagementCommands class and do the
  575. required adjustments to work with cluster mode
  576. """
  577. async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
  578. """
  579. Set hash slots as unbound in the cluster.
  580. It determines by it self what node the slot is in and sends it there
  581. Returns a list of the results for each processed slot.
  582. For more information see https://redis.io/commands/cluster-delslots
  583. """
  584. return await asyncio.gather(
  585. *(
  586. asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
  587. for slot in slots
  588. )
  589. )
  590. class ClusterDataAccessCommands(DataAccessCommands):
  591. """
  592. A class for Redis Cluster Data Access Commands
  593. The class inherits from Redis's core DataAccessCommand class and do the
  594. required adjustments to work with cluster mode
  595. """
  596. def stralgo(
  597. self,
  598. algo: Literal["LCS"],
  599. value1: KeyT,
  600. value2: KeyT,
  601. specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
  602. len: bool = False,
  603. idx: bool = False,
  604. minmatchlen: Optional[int] = None,
  605. withmatchlen: bool = False,
  606. **kwargs,
  607. ) -> ResponseT:
  608. """
  609. Implements complex algorithms that operate on strings.
  610. Right now the only algorithm implemented is the LCS algorithm
  611. (longest common substring). However new algorithms could be
  612. implemented in the future.
  613. ``algo`` Right now must be LCS
  614. ``value1`` and ``value2`` Can be two strings or two keys
  615. ``specific_argument`` Specifying if the arguments to the algorithm
  616. will be keys or strings. strings is the default.
  617. ``len`` Returns just the len of the match.
  618. ``idx`` Returns the match positions in each string.
  619. ``minmatchlen`` Restrict the list of matches to the ones of a given
  620. minimal length. Can be provided only when ``idx`` set to True.
  621. ``withmatchlen`` Returns the matches with the len of the match.
  622. Can be provided only when ``idx`` set to True.
  623. For more information see https://redis.io/commands/stralgo
  624. """
  625. target_nodes = kwargs.pop("target_nodes", None)
  626. if specific_argument == "strings" and target_nodes is None:
  627. target_nodes = "default-node"
  628. kwargs.update({"target_nodes": target_nodes})
  629. return super().stralgo(
  630. algo,
  631. value1,
  632. value2,
  633. specific_argument,
  634. len,
  635. idx,
  636. minmatchlen,
  637. withmatchlen,
  638. **kwargs,
  639. )
  640. def scan_iter(
  641. self,
  642. match: Optional[PatternT] = None,
  643. count: Optional[int] = None,
  644. _type: Optional[str] = None,
  645. **kwargs,
  646. ) -> Iterator:
  647. # Do the first query with cursor=0 for all nodes
  648. cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
  649. yield from data
  650. cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
  651. if cursors:
  652. # Get nodes by name
  653. nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
  654. # Iterate over each node till its cursor is 0
  655. kwargs.pop("target_nodes", None)
  656. while cursors:
  657. for name, cursor in cursors.items():
  658. cur, data = self.scan(
  659. cursor=cursor,
  660. match=match,
  661. count=count,
  662. _type=_type,
  663. target_nodes=nodes[name],
  664. **kwargs,
  665. )
  666. yield from data
  667. cursors[name] = cur[name]
  668. cursors = {
  669. name: cursor for name, cursor in cursors.items() if cursor != 0
  670. }
  671. class AsyncClusterDataAccessCommands(
  672. ClusterDataAccessCommands, AsyncDataAccessCommands
  673. ):
  674. """
  675. A class for Redis Cluster Data Access Commands
  676. The class inherits from Redis's core DataAccessCommand class and do the
  677. required adjustments to work with cluster mode
  678. """
  679. async def scan_iter(
  680. self,
  681. match: Optional[PatternT] = None,
  682. count: Optional[int] = None,
  683. _type: Optional[str] = None,
  684. **kwargs,
  685. ) -> AsyncIterator:
  686. # Do the first query with cursor=0 for all nodes
  687. cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
  688. for value in data:
  689. yield value
  690. cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
  691. if cursors:
  692. # Get nodes by name
  693. nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
  694. # Iterate over each node till its cursor is 0
  695. kwargs.pop("target_nodes", None)
  696. while cursors:
  697. for name, cursor in cursors.items():
  698. cur, data = await self.scan(
  699. cursor=cursor,
  700. match=match,
  701. count=count,
  702. _type=_type,
  703. target_nodes=nodes[name],
  704. **kwargs,
  705. )
  706. for value in data:
  707. yield value
  708. cursors[name] = cur[name]
  709. cursors = {
  710. name: cursor for name, cursor in cursors.items() if cursor != 0
  711. }
  712. class RedisClusterCommands(
  713. ClusterMultiKeyCommands,
  714. ClusterManagementCommands,
  715. ACLCommands,
  716. PubSubCommands,
  717. ClusterDataAccessCommands,
  718. ScriptCommands,
  719. FunctionCommands,
  720. GearsCommands,
  721. ModuleCommands,
  722. RedisModuleCommands,
  723. ):
  724. """
  725. A class for all Redis Cluster commands
  726. For key-based commands, the target node(s) will be internally determined
  727. by the keys' hash slot.
  728. Non-key-based commands can be executed with the 'target_nodes' argument to
  729. target specific nodes. By default, if target_nodes is not specified, the
  730. command will be executed on the default cluster node.
  731. :param :target_nodes: type can be one of the followings:
  732. - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
  733. - 'ClusterNode'
  734. - 'list(ClusterNodes)'
  735. - 'dict(any:clusterNodes)'
  736. for example:
  737. r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
  738. """
  739. class AsyncRedisClusterCommands(
  740. AsyncClusterMultiKeyCommands,
  741. AsyncClusterManagementCommands,
  742. AsyncACLCommands,
  743. AsyncClusterDataAccessCommands,
  744. AsyncScriptCommands,
  745. AsyncFunctionCommands,
  746. AsyncGearsCommands,
  747. AsyncModuleCommands,
  748. AsyncRedisModuleCommands,
  749. ):
  750. """
  751. A class for all Redis Cluster commands
  752. For key-based commands, the target node(s) will be internally determined
  753. by the keys' hash slot.
  754. Non-key-based commands can be executed with the 'target_nodes' argument to
  755. target specific nodes. By default, if target_nodes is not specified, the
  756. command will be executed on the default cluster node.
  757. :param :target_nodes: type can be one of the followings:
  758. - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
  759. - 'ClusterNode'
  760. - 'list(ClusterNodes)'
  761. - 'dict(any:clusterNodes)'
  762. for example:
  763. r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
  764. """