图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1136 lines
46 KiB

  1. from typing import Dict, List, Optional, Tuple, Union
  2. from redis.exceptions import DataError
  3. from redis.typing import KeyT, Number
  4. ADD_CMD = "TS.ADD"
  5. ALTER_CMD = "TS.ALTER"
  6. CREATERULE_CMD = "TS.CREATERULE"
  7. CREATE_CMD = "TS.CREATE"
  8. DECRBY_CMD = "TS.DECRBY"
  9. DELETERULE_CMD = "TS.DELETERULE"
  10. DEL_CMD = "TS.DEL"
  11. GET_CMD = "TS.GET"
  12. INCRBY_CMD = "TS.INCRBY"
  13. INFO_CMD = "TS.INFO"
  14. MADD_CMD = "TS.MADD"
  15. MGET_CMD = "TS.MGET"
  16. MRANGE_CMD = "TS.MRANGE"
  17. MREVRANGE_CMD = "TS.MREVRANGE"
  18. QUERYINDEX_CMD = "TS.QUERYINDEX"
  19. RANGE_CMD = "TS.RANGE"
  20. REVRANGE_CMD = "TS.REVRANGE"
  21. class TimeSeriesCommands:
  22. """RedisTimeSeries Commands."""
  23. def create(
  24. self,
  25. key: KeyT,
  26. retention_msecs: Optional[int] = None,
  27. uncompressed: Optional[bool] = False,
  28. labels: Optional[Dict[str, str]] = None,
  29. chunk_size: Optional[int] = None,
  30. duplicate_policy: Optional[str] = None,
  31. ignore_max_time_diff: Optional[int] = None,
  32. ignore_max_val_diff: Optional[Number] = None,
  33. ):
  34. """
  35. Create a new time-series.
  36. For more information see https://redis.io/commands/ts.create/
  37. Args:
  38. key:
  39. The time-series key.
  40. retention_msecs:
  41. Maximum age for samples, compared to the highest reported timestamp in
  42. milliseconds. If `None` or `0` is passed, the series is not trimmed at
  43. all.
  44. uncompressed:
  45. Changes data storage from compressed (default) to uncompressed.
  46. labels:
  47. A dictionary of label-value pairs that represent metadata labels of the
  48. key.
  49. chunk_size:
  50. Memory size, in bytes, allocated for each data chunk. Must be a multiple
  51. of 8 in the range `[48..1048576]`. In earlier versions of the module the
  52. minimum value was different.
  53. duplicate_policy:
  54. Policy for handling multiple samples with identical timestamps. Can be
  55. one of:
  56. - 'block': An error will occur and the new value will be ignored.
  57. - 'first': Ignore the new value.
  58. - 'last': Override with the latest value.
  59. - 'min': Only override if the value is lower than the existing
  60. value.
  61. - 'max': Only override if the value is higher than the existing
  62. value.
  63. - 'sum': If a previous sample exists, add the new sample to it so
  64. that the updated value is equal to (previous + new). If no
  65. previous sample exists, set the updated value equal to the new
  66. value.
  67. ignore_max_time_diff:
  68. A non-negative integer value, in milliseconds, that sets an ignore
  69. threshold for added timestamps. If the difference between the last
  70. timestamp and the new timestamp is lower than this threshold, the new
  71. entry is ignored. Only applicable if `duplicate_policy` is set to
  72. `last`, and if `ignore_max_val_diff` is also set. Available since
  73. RedisTimeSeries version 1.12.0.
  74. ignore_max_val_diff:
  75. A non-negative floating point value, that sets an ignore threshold for
  76. added values. If the difference between the last value and the new value
  77. is lower than this threshold, the new entry is ignored. Only applicable
  78. if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is
  79. also set. Available since RedisTimeSeries version 1.12.0.
  80. """
  81. params = [key]
  82. self._append_retention(params, retention_msecs)
  83. self._append_uncompressed(params, uncompressed)
  84. self._append_chunk_size(params, chunk_size)
  85. self._append_duplicate_policy(params, duplicate_policy)
  86. self._append_labels(params, labels)
  87. self._append_insertion_filters(
  88. params, ignore_max_time_diff, ignore_max_val_diff
  89. )
  90. return self.execute_command(CREATE_CMD, *params)
  91. def alter(
  92. self,
  93. key: KeyT,
  94. retention_msecs: Optional[int] = None,
  95. labels: Optional[Dict[str, str]] = None,
  96. chunk_size: Optional[int] = None,
  97. duplicate_policy: Optional[str] = None,
  98. ignore_max_time_diff: Optional[int] = None,
  99. ignore_max_val_diff: Optional[Number] = None,
  100. ):
  101. """
  102. Update an existing time series.
  103. For more information see https://redis.io/commands/ts.alter/
  104. Args:
  105. key:
  106. The time-series key.
  107. retention_msecs:
  108. Maximum age for samples, compared to the highest reported timestamp in
  109. milliseconds. If `None` or `0` is passed, the series is not trimmed at
  110. all.
  111. labels:
  112. A dictionary of label-value pairs that represent metadata labels of the
  113. key.
  114. chunk_size:
  115. Memory size, in bytes, allocated for each data chunk. Must be a multiple
  116. of 8 in the range `[48..1048576]`. In earlier versions of the module the
  117. minimum value was different. Changing this value does not affect
  118. existing chunks.
  119. duplicate_policy:
  120. Policy for handling multiple samples with identical timestamps. Can be
  121. one of:
  122. - 'block': An error will occur and the new value will be ignored.
  123. - 'first': Ignore the new value.
  124. - 'last': Override with the latest value.
  125. - 'min': Only override if the value is lower than the existing
  126. value.
  127. - 'max': Only override if the value is higher than the existing
  128. value.
  129. - 'sum': If a previous sample exists, add the new sample to it so
  130. that the updated value is equal to (previous + new). If no
  131. previous sample exists, set the updated value equal to the new
  132. value.
  133. ignore_max_time_diff:
  134. A non-negative integer value, in milliseconds, that sets an ignore
  135. threshold for added timestamps. If the difference between the last
  136. timestamp and the new timestamp is lower than this threshold, the new
  137. entry is ignored. Only applicable if `duplicate_policy` is set to
  138. `last`, and if `ignore_max_val_diff` is also set. Available since
  139. RedisTimeSeries version 1.12.0.
  140. ignore_max_val_diff:
  141. A non-negative floating point value, that sets an ignore threshold for
  142. added values. If the difference between the last value and the new value
  143. is lower than this threshold, the new entry is ignored. Only applicable
  144. if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is
  145. also set. Available since RedisTimeSeries version 1.12.0.
  146. """
  147. params = [key]
  148. self._append_retention(params, retention_msecs)
  149. self._append_chunk_size(params, chunk_size)
  150. self._append_duplicate_policy(params, duplicate_policy)
  151. self._append_labels(params, labels)
  152. self._append_insertion_filters(
  153. params, ignore_max_time_diff, ignore_max_val_diff
  154. )
  155. return self.execute_command(ALTER_CMD, *params)
  156. def add(
  157. self,
  158. key: KeyT,
  159. timestamp: Union[int, str],
  160. value: Number,
  161. retention_msecs: Optional[int] = None,
  162. uncompressed: Optional[bool] = False,
  163. labels: Optional[Dict[str, str]] = None,
  164. chunk_size: Optional[int] = None,
  165. duplicate_policy: Optional[str] = None,
  166. ignore_max_time_diff: Optional[int] = None,
  167. ignore_max_val_diff: Optional[Number] = None,
  168. on_duplicate: Optional[str] = None,
  169. ):
  170. """
  171. Append a sample to a time series. When the specified key does not exist, a new
  172. time series is created.
  173. For more information see https://redis.io/commands/ts.add/
  174. Args:
  175. key:
  176. The time-series key.
  177. timestamp:
  178. Timestamp of the sample. `*` can be used for automatic timestamp (using
  179. the system clock).
  180. value:
  181. Numeric data value of the sample.
  182. retention_msecs:
  183. Maximum age for samples, compared to the highest reported timestamp in
  184. milliseconds. If `None` or `0` is passed, the series is not trimmed at
  185. all.
  186. uncompressed:
  187. Changes data storage from compressed (default) to uncompressed.
  188. labels:
  189. A dictionary of label-value pairs that represent metadata labels of the
  190. key.
  191. chunk_size:
  192. Memory size, in bytes, allocated for each data chunk. Must be a multiple
  193. of 8 in the range `[48..1048576]`. In earlier versions of the module the
  194. minimum value was different.
  195. duplicate_policy:
  196. Policy for handling multiple samples with identical timestamps. Can be
  197. one of:
  198. - 'block': An error will occur and the new value will be ignored.
  199. - 'first': Ignore the new value.
  200. - 'last': Override with the latest value.
  201. - 'min': Only override if the value is lower than the existing
  202. value.
  203. - 'max': Only override if the value is higher than the existing
  204. value.
  205. - 'sum': If a previous sample exists, add the new sample to it so
  206. that the updated value is equal to (previous + new). If no
  207. previous sample exists, set the updated value equal to the new
  208. value.
  209. ignore_max_time_diff:
  210. A non-negative integer value, in milliseconds, that sets an ignore
  211. threshold for added timestamps. If the difference between the last
  212. timestamp and the new timestamp is lower than this threshold, the new
  213. entry is ignored. Only applicable if `duplicate_policy` is set to
  214. `last`, and if `ignore_max_val_diff` is also set. Available since
  215. RedisTimeSeries version 1.12.0.
  216. ignore_max_val_diff:
  217. A non-negative floating point value, that sets an ignore threshold for
  218. added values. If the difference between the last value and the new value
  219. is lower than this threshold, the new entry is ignored. Only applicable
  220. if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is
  221. also set. Available since RedisTimeSeries version 1.12.0.
  222. on_duplicate:
  223. Use a specific duplicate policy for the specified timestamp. Overrides
  224. the duplicate policy set by `duplicate_policy`.
  225. """
  226. params = [key, timestamp, value]
  227. self._append_retention(params, retention_msecs)
  228. self._append_uncompressed(params, uncompressed)
  229. self._append_chunk_size(params, chunk_size)
  230. self._append_duplicate_policy(params, duplicate_policy)
  231. self._append_labels(params, labels)
  232. self._append_insertion_filters(
  233. params, ignore_max_time_diff, ignore_max_val_diff
  234. )
  235. self._append_on_duplicate(params, on_duplicate)
  236. return self.execute_command(ADD_CMD, *params)
  237. def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]):
  238. """
  239. Append new samples to one or more time series.
  240. Each time series must already exist.
  241. The method expects a list of tuples. Each tuple should contain three elements:
  242. (`key`, `timestamp`, `value`). The `value` will be appended to the time series
  243. identified by 'key', at the given 'timestamp'.
  244. For more information see https://redis.io/commands/ts.madd/
  245. Args:
  246. ktv_tuples:
  247. A list of tuples, where each tuple contains:
  248. - `key`: The key of the time series.
  249. - `timestamp`: The timestamp at which the value should be appended.
  250. - `value`: The value to append to the time series.
  251. Returns:
  252. A list that contains, for each sample, either the timestamp that was used,
  253. or an error, if the sample could not be added.
  254. """
  255. params = []
  256. for ktv in ktv_tuples:
  257. params.extend(ktv)
  258. return self.execute_command(MADD_CMD, *params)
  259. def incrby(
  260. self,
  261. key: KeyT,
  262. value: Number,
  263. timestamp: Optional[Union[int, str]] = None,
  264. retention_msecs: Optional[int] = None,
  265. uncompressed: Optional[bool] = False,
  266. labels: Optional[Dict[str, str]] = None,
  267. chunk_size: Optional[int] = None,
  268. duplicate_policy: Optional[str] = None,
  269. ignore_max_time_diff: Optional[int] = None,
  270. ignore_max_val_diff: Optional[Number] = None,
  271. ):
  272. """
  273. Increment the latest sample's of a series. When the specified key does not
  274. exist, a new time series is created.
  275. This command can be used as a counter or gauge that automatically gets history
  276. as a time series.
  277. For more information see https://redis.io/commands/ts.incrby/
  278. Args:
  279. key:
  280. The time-series key.
  281. value:
  282. Numeric value to be added (addend).
  283. timestamp:
  284. Timestamp of the sample. `*` can be used for automatic timestamp (using
  285. the system clock). `timestamp` must be equal to or higher than the
  286. maximum existing timestamp in the series. When equal, the value of the
  287. sample with the maximum existing timestamp is increased. If it is
  288. higher, a new sample with a timestamp set to `timestamp` is created, and
  289. its value is set to the value of the sample with the maximum existing
  290. timestamp plus the addend.
  291. retention_msecs:
  292. Maximum age for samples, compared to the highest reported timestamp in
  293. milliseconds. If `None` or `0` is passed, the series is not trimmed at
  294. all.
  295. uncompressed:
  296. Changes data storage from compressed (default) to uncompressed.
  297. labels:
  298. A dictionary of label-value pairs that represent metadata labels of the
  299. key.
  300. chunk_size:
  301. Memory size, in bytes, allocated for each data chunk. Must be a multiple
  302. of 8 in the range `[48..1048576]`. In earlier versions of the module the
  303. minimum value was different.
  304. duplicate_policy:
  305. Policy for handling multiple samples with identical timestamps. Can be
  306. one of:
  307. - 'block': An error will occur and the new value will be ignored.
  308. - 'first': Ignore the new value.
  309. - 'last': Override with the latest value.
  310. - 'min': Only override if the value is lower than the existing
  311. value.
  312. - 'max': Only override if the value is higher than the existing
  313. value.
  314. - 'sum': If a previous sample exists, add the new sample to it so
  315. that the updated value is equal to (previous + new). If no
  316. previous sample exists, set the updated value equal to the new
  317. value.
  318. ignore_max_time_diff:
  319. A non-negative integer value, in milliseconds, that sets an ignore
  320. threshold for added timestamps. If the difference between the last
  321. timestamp and the new timestamp is lower than this threshold, the new
  322. entry is ignored. Only applicable if `duplicate_policy` is set to
  323. `last`, and if `ignore_max_val_diff` is also set. Available since
  324. RedisTimeSeries version 1.12.0.
  325. ignore_max_val_diff:
  326. A non-negative floating point value, that sets an ignore threshold for
  327. added values. If the difference between the last value and the new value
  328. is lower than this threshold, the new entry is ignored. Only applicable
  329. if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is
  330. also set. Available since RedisTimeSeries version 1.12.0.
  331. Returns:
  332. The timestamp of the sample that was modified or added.
  333. """
  334. params = [key, value]
  335. self._append_timestamp(params, timestamp)
  336. self._append_retention(params, retention_msecs)
  337. self._append_uncompressed(params, uncompressed)
  338. self._append_chunk_size(params, chunk_size)
  339. self._append_duplicate_policy(params, duplicate_policy)
  340. self._append_labels(params, labels)
  341. self._append_insertion_filters(
  342. params, ignore_max_time_diff, ignore_max_val_diff
  343. )
  344. return self.execute_command(INCRBY_CMD, *params)
  345. def decrby(
  346. self,
  347. key: KeyT,
  348. value: Number,
  349. timestamp: Optional[Union[int, str]] = None,
  350. retention_msecs: Optional[int] = None,
  351. uncompressed: Optional[bool] = False,
  352. labels: Optional[Dict[str, str]] = None,
  353. chunk_size: Optional[int] = None,
  354. duplicate_policy: Optional[str] = None,
  355. ignore_max_time_diff: Optional[int] = None,
  356. ignore_max_val_diff: Optional[Number] = None,
  357. ):
  358. """
  359. Decrement the latest sample's of a series. When the specified key does not
  360. exist, a new time series is created.
  361. This command can be used as a counter or gauge that automatically gets history
  362. as a time series.
  363. For more information see https://redis.io/commands/ts.decrby/
  364. Args:
  365. key:
  366. The time-series key.
  367. value:
  368. Numeric value to subtract (subtrahend).
  369. timestamp:
  370. Timestamp of the sample. `*` can be used for automatic timestamp (using
  371. the system clock). `timestamp` must be equal to or higher than the
  372. maximum existing timestamp in the series. When equal, the value of the
  373. sample with the maximum existing timestamp is decreased. If it is
  374. higher, a new sample with a timestamp set to `timestamp` is created, and
  375. its value is set to the value of the sample with the maximum existing
  376. timestamp minus subtrahend.
  377. retention_msecs:
  378. Maximum age for samples, compared to the highest reported timestamp in
  379. milliseconds. If `None` or `0` is passed, the series is not trimmed at
  380. all.
  381. uncompressed:
  382. Changes data storage from compressed (default) to uncompressed.
  383. labels:
  384. A dictionary of label-value pairs that represent metadata labels of the
  385. key.
  386. chunk_size:
  387. Memory size, in bytes, allocated for each data chunk. Must be a multiple
  388. of 8 in the range `[48..1048576]`. In earlier versions of the module the
  389. minimum value was different.
  390. duplicate_policy:
  391. Policy for handling multiple samples with identical timestamps. Can be
  392. one of:
  393. - 'block': An error will occur and the new value will be ignored.
  394. - 'first': Ignore the new value.
  395. - 'last': Override with the latest value.
  396. - 'min': Only override if the value is lower than the existing
  397. value.
  398. - 'max': Only override if the value is higher than the existing
  399. value.
  400. - 'sum': If a previous sample exists, add the new sample to it so
  401. that the updated value is equal to (previous + new). If no
  402. previous sample exists, set the updated value equal to the new
  403. value.
  404. ignore_max_time_diff:
  405. A non-negative integer value, in milliseconds, that sets an ignore
  406. threshold for added timestamps. If the difference between the last
  407. timestamp and the new timestamp is lower than this threshold, the new
  408. entry is ignored. Only applicable if `duplicate_policy` is set to
  409. `last`, and if `ignore_max_val_diff` is also set. Available since
  410. RedisTimeSeries version 1.12.0.
  411. ignore_max_val_diff:
  412. A non-negative floating point value, that sets an ignore threshold for
  413. added values. If the difference between the last value and the new value
  414. is lower than this threshold, the new entry is ignored. Only applicable
  415. if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is
  416. also set. Available since RedisTimeSeries version 1.12.0.
  417. Returns:
  418. The timestamp of the sample that was modified or added.
  419. """
  420. params = [key, value]
  421. self._append_timestamp(params, timestamp)
  422. self._append_retention(params, retention_msecs)
  423. self._append_uncompressed(params, uncompressed)
  424. self._append_chunk_size(params, chunk_size)
  425. self._append_duplicate_policy(params, duplicate_policy)
  426. self._append_labels(params, labels)
  427. self._append_insertion_filters(
  428. params, ignore_max_time_diff, ignore_max_val_diff
  429. )
  430. return self.execute_command(DECRBY_CMD, *params)
  431. def delete(self, key: KeyT, from_time: int, to_time: int):
  432. """
  433. Delete all samples between two timestamps for a given time series.
  434. The given timestamp interval is closed (inclusive), meaning that samples whose
  435. timestamp equals `from_time` or `to_time` are also deleted.
  436. For more information see https://redis.io/commands/ts.del/
  437. Args:
  438. key:
  439. The time-series key.
  440. from_time:
  441. Start timestamp for the range deletion.
  442. to_time:
  443. End timestamp for the range deletion.
  444. Returns:
  445. The number of samples deleted.
  446. """
  447. return self.execute_command(DEL_CMD, key, from_time, to_time)
  448. def createrule(
  449. self,
  450. source_key: KeyT,
  451. dest_key: KeyT,
  452. aggregation_type: str,
  453. bucket_size_msec: int,
  454. align_timestamp: Optional[int] = None,
  455. ):
  456. """
  457. Create a compaction rule from values added to `source_key` into `dest_key`.
  458. For more information see https://redis.io/commands/ts.createrule/
  459. Args:
  460. source_key:
  461. Key name for source time series.
  462. dest_key:
  463. Key name for destination (compacted) time series.
  464. aggregation_type:
  465. Aggregation type: One of the following:
  466. [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`,
  467. `std.s`, `var.p`, `var.s`, `twa`]
  468. bucket_size_msec:
  469. Duration of each bucket, in milliseconds.
  470. align_timestamp:
  471. Assure that there is a bucket that starts at exactly align_timestamp and
  472. align all other buckets accordingly.
  473. """
  474. params = [source_key, dest_key]
  475. self._append_aggregation(params, aggregation_type, bucket_size_msec)
  476. if align_timestamp is not None:
  477. params.append(align_timestamp)
  478. return self.execute_command(CREATERULE_CMD, *params)
  479. def deleterule(self, source_key: KeyT, dest_key: KeyT):
  480. """
  481. Delete a compaction rule from `source_key` to `dest_key`.
  482. For more information see https://redis.io/commands/ts.deleterule/
  483. """
  484. return self.execute_command(DELETERULE_CMD, source_key, dest_key)
  485. def __range_params(
  486. self,
  487. key: KeyT,
  488. from_time: Union[int, str],
  489. to_time: Union[int, str],
  490. count: Optional[int],
  491. aggregation_type: Optional[str],
  492. bucket_size_msec: Optional[int],
  493. filter_by_ts: Optional[List[int]],
  494. filter_by_min_value: Optional[int],
  495. filter_by_max_value: Optional[int],
  496. align: Optional[Union[int, str]],
  497. latest: Optional[bool],
  498. bucket_timestamp: Optional[str],
  499. empty: Optional[bool],
  500. ):
  501. """Create TS.RANGE and TS.REVRANGE arguments."""
  502. params = [key, from_time, to_time]
  503. self._append_latest(params, latest)
  504. self._append_filer_by_ts(params, filter_by_ts)
  505. self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value)
  506. self._append_count(params, count)
  507. self._append_align(params, align)
  508. self._append_aggregation(params, aggregation_type, bucket_size_msec)
  509. self._append_bucket_timestamp(params, bucket_timestamp)
  510. self._append_empty(params, empty)
  511. return params
  512. def range(
  513. self,
  514. key: KeyT,
  515. from_time: Union[int, str],
  516. to_time: Union[int, str],
  517. count: Optional[int] = None,
  518. aggregation_type: Optional[str] = None,
  519. bucket_size_msec: Optional[int] = 0,
  520. filter_by_ts: Optional[List[int]] = None,
  521. filter_by_min_value: Optional[int] = None,
  522. filter_by_max_value: Optional[int] = None,
  523. align: Optional[Union[int, str]] = None,
  524. latest: Optional[bool] = False,
  525. bucket_timestamp: Optional[str] = None,
  526. empty: Optional[bool] = False,
  527. ):
  528. """
  529. Query a range in forward direction for a specific time-series.
  530. For more information see https://redis.io/commands/ts.range/
  531. Args:
  532. key:
  533. Key name for timeseries.
  534. from_time:
  535. Start timestamp for the range query. `-` can be used to express the
  536. minimum possible timestamp (0).
  537. to_time:
  538. End timestamp for range query, `+` can be used to express the maximum
  539. possible timestamp.
  540. count:
  541. Limits the number of returned samples.
  542. aggregation_type:
  543. Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
  544. `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
  545. `twa`]
  546. bucket_size_msec:
  547. Time bucket for aggregation in milliseconds.
  548. filter_by_ts:
  549. List of timestamps to filter the result by specific timestamps.
  550. filter_by_min_value:
  551. Filter result by minimum value (must mention also
  552. `filter by_max_value`).
  553. filter_by_max_value:
  554. Filter result by maximum value (must mention also
  555. `filter by_min_value`).
  556. align:
  557. Timestamp for alignment control for aggregation.
  558. latest:
  559. Used when a time series is a compaction, reports the compacted value of
  560. the latest possibly partial bucket.
  561. bucket_timestamp:
  562. Controls how bucket timestamps are reported. Can be one of [`-`, `low`,
  563. `+`, `high`, `~`, `mid`].
  564. empty:
  565. Reports aggregations for empty buckets.
  566. """
  567. params = self.__range_params(
  568. key,
  569. from_time,
  570. to_time,
  571. count,
  572. aggregation_type,
  573. bucket_size_msec,
  574. filter_by_ts,
  575. filter_by_min_value,
  576. filter_by_max_value,
  577. align,
  578. latest,
  579. bucket_timestamp,
  580. empty,
  581. )
  582. return self.execute_command(RANGE_CMD, *params)
  583. def revrange(
  584. self,
  585. key: KeyT,
  586. from_time: Union[int, str],
  587. to_time: Union[int, str],
  588. count: Optional[int] = None,
  589. aggregation_type: Optional[str] = None,
  590. bucket_size_msec: Optional[int] = 0,
  591. filter_by_ts: Optional[List[int]] = None,
  592. filter_by_min_value: Optional[int] = None,
  593. filter_by_max_value: Optional[int] = None,
  594. align: Optional[Union[int, str]] = None,
  595. latest: Optional[bool] = False,
  596. bucket_timestamp: Optional[str] = None,
  597. empty: Optional[bool] = False,
  598. ):
  599. """
  600. Query a range in reverse direction for a specific time-series.
  601. **Note**: This command is only available since RedisTimeSeries >= v1.4
  602. For more information see https://redis.io/commands/ts.revrange/
  603. Args:
  604. key:
  605. Key name for timeseries.
  606. from_time:
  607. Start timestamp for the range query. `-` can be used to express the
  608. minimum possible timestamp (0).
  609. to_time:
  610. End timestamp for range query, `+` can be used to express the maximum
  611. possible timestamp.
  612. count:
  613. Limits the number of returned samples.
  614. aggregation_type:
  615. Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
  616. `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
  617. `twa`]
  618. bucket_size_msec:
  619. Time bucket for aggregation in milliseconds.
  620. filter_by_ts:
  621. List of timestamps to filter the result by specific timestamps.
  622. filter_by_min_value:
  623. Filter result by minimum value (must mention also
  624. `filter_by_max_value`).
  625. filter_by_max_value:
  626. Filter result by maximum value (must mention also
  627. `filter_by_min_value`).
  628. align:
  629. Timestamp for alignment control for aggregation.
  630. latest:
  631. Used when a time series is a compaction, reports the compacted value of
  632. the latest possibly partial bucket.
  633. bucket_timestamp:
  634. Controls how bucket timestamps are reported. Can be one of [`-`, `low`,
  635. `+`, `high`, `~`, `mid`].
  636. empty:
  637. Reports aggregations for empty buckets.
  638. """
  639. params = self.__range_params(
  640. key,
  641. from_time,
  642. to_time,
  643. count,
  644. aggregation_type,
  645. bucket_size_msec,
  646. filter_by_ts,
  647. filter_by_min_value,
  648. filter_by_max_value,
  649. align,
  650. latest,
  651. bucket_timestamp,
  652. empty,
  653. )
  654. return self.execute_command(REVRANGE_CMD, *params)
  655. def __mrange_params(
  656. self,
  657. aggregation_type: Optional[str],
  658. bucket_size_msec: Optional[int],
  659. count: Optional[int],
  660. filters: List[str],
  661. from_time: Union[int, str],
  662. to_time: Union[int, str],
  663. with_labels: Optional[bool],
  664. filter_by_ts: Optional[List[int]],
  665. filter_by_min_value: Optional[int],
  666. filter_by_max_value: Optional[int],
  667. groupby: Optional[str],
  668. reduce: Optional[str],
  669. select_labels: Optional[List[str]],
  670. align: Optional[Union[int, str]],
  671. latest: Optional[bool],
  672. bucket_timestamp: Optional[str],
  673. empty: Optional[bool],
  674. ):
  675. """Create TS.MRANGE and TS.MREVRANGE arguments."""
  676. params = [from_time, to_time]
  677. self._append_latest(params, latest)
  678. self._append_filer_by_ts(params, filter_by_ts)
  679. self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value)
  680. self._append_with_labels(params, with_labels, select_labels)
  681. self._append_count(params, count)
  682. self._append_align(params, align)
  683. self._append_aggregation(params, aggregation_type, bucket_size_msec)
  684. self._append_bucket_timestamp(params, bucket_timestamp)
  685. self._append_empty(params, empty)
  686. params.extend(["FILTER"])
  687. params += filters
  688. self._append_groupby_reduce(params, groupby, reduce)
  689. return params
  690. def mrange(
  691. self,
  692. from_time: Union[int, str],
  693. to_time: Union[int, str],
  694. filters: List[str],
  695. count: Optional[int] = None,
  696. aggregation_type: Optional[str] = None,
  697. bucket_size_msec: Optional[int] = 0,
  698. with_labels: Optional[bool] = False,
  699. filter_by_ts: Optional[List[int]] = None,
  700. filter_by_min_value: Optional[int] = None,
  701. filter_by_max_value: Optional[int] = None,
  702. groupby: Optional[str] = None,
  703. reduce: Optional[str] = None,
  704. select_labels: Optional[List[str]] = None,
  705. align: Optional[Union[int, str]] = None,
  706. latest: Optional[bool] = False,
  707. bucket_timestamp: Optional[str] = None,
  708. empty: Optional[bool] = False,
  709. ):
  710. """
  711. Query a range across multiple time-series by filters in forward direction.
  712. For more information see https://redis.io/commands/ts.mrange/
  713. Args:
  714. from_time:
  715. Start timestamp for the range query. `-` can be used to express the
  716. minimum possible timestamp (0).
  717. to_time:
  718. End timestamp for range query, `+` can be used to express the maximum
  719. possible timestamp.
  720. filters:
  721. Filter to match the time-series labels.
  722. count:
  723. Limits the number of returned samples.
  724. aggregation_type:
  725. Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
  726. `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
  727. `twa`]
  728. bucket_size_msec:
  729. Time bucket for aggregation in milliseconds.
  730. with_labels:
  731. Include in the reply all label-value pairs representing metadata labels
  732. of the time series.
  733. filter_by_ts:
  734. List of timestamps to filter the result by specific timestamps.
  735. filter_by_min_value:
  736. Filter result by minimum value (must mention also
  737. `filter_by_max_value`).
  738. filter_by_max_value:
  739. Filter result by maximum value (must mention also
  740. `filter_by_min_value`).
  741. groupby:
  742. Grouping by fields the results (must mention also `reduce`).
  743. reduce:
  744. Applying reducer functions on each group. Can be one of [`avg` `sum`,
  745. `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`].
  746. select_labels:
  747. Include in the reply only a subset of the key-value pair labels of a
  748. series.
  749. align:
  750. Timestamp for alignment control for aggregation.
  751. latest:
  752. Used when a time series is a compaction, reports the compacted value of
  753. the latest possibly partial bucket.
  754. bucket_timestamp:
  755. Controls how bucket timestamps are reported. Can be one of [`-`, `low`,
  756. `+`, `high`, `~`, `mid`].
  757. empty:
  758. Reports aggregations for empty buckets.
  759. """
  760. params = self.__mrange_params(
  761. aggregation_type,
  762. bucket_size_msec,
  763. count,
  764. filters,
  765. from_time,
  766. to_time,
  767. with_labels,
  768. filter_by_ts,
  769. filter_by_min_value,
  770. filter_by_max_value,
  771. groupby,
  772. reduce,
  773. select_labels,
  774. align,
  775. latest,
  776. bucket_timestamp,
  777. empty,
  778. )
  779. return self.execute_command(MRANGE_CMD, *params)
  780. def mrevrange(
  781. self,
  782. from_time: Union[int, str],
  783. to_time: Union[int, str],
  784. filters: List[str],
  785. count: Optional[int] = None,
  786. aggregation_type: Optional[str] = None,
  787. bucket_size_msec: Optional[int] = 0,
  788. with_labels: Optional[bool] = False,
  789. filter_by_ts: Optional[List[int]] = None,
  790. filter_by_min_value: Optional[int] = None,
  791. filter_by_max_value: Optional[int] = None,
  792. groupby: Optional[str] = None,
  793. reduce: Optional[str] = None,
  794. select_labels: Optional[List[str]] = None,
  795. align: Optional[Union[int, str]] = None,
  796. latest: Optional[bool] = False,
  797. bucket_timestamp: Optional[str] = None,
  798. empty: Optional[bool] = False,
  799. ):
  800. """
  801. Query a range across multiple time-series by filters in reverse direction.
  802. For more information see https://redis.io/commands/ts.mrevrange/
  803. Args:
  804. from_time:
  805. Start timestamp for the range query. '-' can be used to express the
  806. minimum possible timestamp (0).
  807. to_time:
  808. End timestamp for range query, '+' can be used to express the maximum
  809. possible timestamp.
  810. filters:
  811. Filter to match the time-series labels.
  812. count:
  813. Limits the number of returned samples.
  814. aggregation_type:
  815. Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
  816. `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`,
  817. `twa`].
  818. bucket_size_msec:
  819. Time bucket for aggregation in milliseconds.
  820. with_labels:
  821. Include in the reply all label-value pairs representing metadata labels
  822. of the time series.
  823. filter_by_ts:
  824. List of timestamps to filter the result by specific timestamps.
  825. filter_by_min_value:
  826. Filter result by minimum value (must mention also
  827. `filter_by_max_value`).
  828. filter_by_max_value:
  829. Filter result by maximum value (must mention also
  830. `filter_by_min_value`).
  831. groupby:
  832. Grouping by fields the results (must mention also `reduce`).
  833. reduce:
  834. Applying reducer functions on each group. Can be one of [`avg` `sum`,
  835. `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`].
  836. select_labels:
  837. Include in the reply only a subset of the key-value pair labels of a
  838. series.
  839. align:
  840. Timestamp for alignment control for aggregation.
  841. latest:
  842. Used when a time series is a compaction, reports the compacted value of
  843. the latest possibly partial bucket.
  844. bucket_timestamp:
  845. Controls how bucket timestamps are reported. Can be one of [`-`, `low`,
  846. `+`, `high`, `~`, `mid`].
  847. empty:
  848. Reports aggregations for empty buckets.
  849. """
  850. params = self.__mrange_params(
  851. aggregation_type,
  852. bucket_size_msec,
  853. count,
  854. filters,
  855. from_time,
  856. to_time,
  857. with_labels,
  858. filter_by_ts,
  859. filter_by_min_value,
  860. filter_by_max_value,
  861. groupby,
  862. reduce,
  863. select_labels,
  864. align,
  865. latest,
  866. bucket_timestamp,
  867. empty,
  868. )
  869. return self.execute_command(MREVRANGE_CMD, *params)
  870. def get(self, key: KeyT, latest: Optional[bool] = False):
  871. """
  872. Get the last sample of `key`.
  873. For more information see https://redis.io/commands/ts.get/
  874. Args:
  875. latest:
  876. Used when a time series is a compaction, reports the compacted value of
  877. the latest (possibly partial) bucket.
  878. """
  879. params = [key]
  880. self._append_latest(params, latest)
  881. return self.execute_command(GET_CMD, *params)
  882. def mget(
  883. self,
  884. filters: List[str],
  885. with_labels: Optional[bool] = False,
  886. select_labels: Optional[List[str]] = None,
  887. latest: Optional[bool] = False,
  888. ):
  889. """
  890. Get the last samples matching the specific `filter`.
  891. For more information see https://redis.io/commands/ts.mget/
  892. Args:
  893. filters:
  894. Filter to match the time-series labels.
  895. with_labels:
  896. Include in the reply all label-value pairs representing metadata labels
  897. of the time series.
  898. select_labels:
  899. Include in the reply only a subset of the key-value pair labels o the
  900. time series.
  901. latest:
  902. Used when a time series is a compaction, reports the compacted value of
  903. the latest possibly partial bucket.
  904. """
  905. params = []
  906. self._append_latest(params, latest)
  907. self._append_with_labels(params, with_labels, select_labels)
  908. params.extend(["FILTER"])
  909. params += filters
  910. return self.execute_command(MGET_CMD, *params)
  911. def info(self, key: KeyT):
  912. """
  913. Get information of `key`.
  914. For more information see https://redis.io/commands/ts.info/
  915. """
  916. return self.execute_command(INFO_CMD, key)
  917. def queryindex(self, filters: List[str]):
  918. """
  919. Get all time series keys matching the `filter` list.
  920. For more information see https://redis.io/commands/ts.queryindex/
  921. """
  922. return self.execute_command(QUERYINDEX_CMD, *filters)
  923. @staticmethod
  924. def _append_uncompressed(params: List[str], uncompressed: Optional[bool]):
  925. """Append UNCOMPRESSED tag to params."""
  926. if uncompressed:
  927. params.extend(["ENCODING", "UNCOMPRESSED"])
  928. @staticmethod
  929. def _append_with_labels(
  930. params: List[str],
  931. with_labels: Optional[bool],
  932. select_labels: Optional[List[str]],
  933. ):
  934. """Append labels behavior to params."""
  935. if with_labels and select_labels:
  936. raise DataError(
  937. "with_labels and select_labels cannot be provided together."
  938. )
  939. if with_labels:
  940. params.extend(["WITHLABELS"])
  941. if select_labels:
  942. params.extend(["SELECTED_LABELS", *select_labels])
  943. @staticmethod
  944. def _append_groupby_reduce(
  945. params: List[str], groupby: Optional[str], reduce: Optional[str]
  946. ):
  947. """Append GROUPBY REDUCE property to params."""
  948. if groupby is not None and reduce is not None:
  949. params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()])
  950. @staticmethod
  951. def _append_retention(params: List[str], retention: Optional[int]):
  952. """Append RETENTION property to params."""
  953. if retention is not None:
  954. params.extend(["RETENTION", retention])
  955. @staticmethod
  956. def _append_labels(params: List[str], labels: Optional[List[str]]):
  957. """Append LABELS property to params."""
  958. if labels:
  959. params.append("LABELS")
  960. for k, v in labels.items():
  961. params.extend([k, v])
  962. @staticmethod
  963. def _append_count(params: List[str], count: Optional[int]):
  964. """Append COUNT property to params."""
  965. if count is not None:
  966. params.extend(["COUNT", count])
  967. @staticmethod
  968. def _append_timestamp(params: List[str], timestamp: Optional[int]):
  969. """Append TIMESTAMP property to params."""
  970. if timestamp is not None:
  971. params.extend(["TIMESTAMP", timestamp])
  972. @staticmethod
  973. def _append_align(params: List[str], align: Optional[Union[int, str]]):
  974. """Append ALIGN property to params."""
  975. if align is not None:
  976. params.extend(["ALIGN", align])
  977. @staticmethod
  978. def _append_aggregation(
  979. params: List[str],
  980. aggregation_type: Optional[str],
  981. bucket_size_msec: Optional[int],
  982. ):
  983. """Append AGGREGATION property to params."""
  984. if aggregation_type is not None:
  985. params.extend(["AGGREGATION", aggregation_type, bucket_size_msec])
  986. @staticmethod
  987. def _append_chunk_size(params: List[str], chunk_size: Optional[int]):
  988. """Append CHUNK_SIZE property to params."""
  989. if chunk_size is not None:
  990. params.extend(["CHUNK_SIZE", chunk_size])
  991. @staticmethod
  992. def _append_duplicate_policy(params: List[str], duplicate_policy: Optional[str]):
  993. """Append DUPLICATE_POLICY property to params."""
  994. if duplicate_policy is not None:
  995. params.extend(["DUPLICATE_POLICY", duplicate_policy])
  996. @staticmethod
  997. def _append_on_duplicate(params: List[str], on_duplicate: Optional[str]):
  998. """Append ON_DUPLICATE property to params."""
  999. if on_duplicate is not None:
  1000. params.extend(["ON_DUPLICATE", on_duplicate])
  1001. @staticmethod
  1002. def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]):
  1003. """Append FILTER_BY_TS property to params."""
  1004. if ts_list is not None:
  1005. params.extend(["FILTER_BY_TS", *ts_list])
  1006. @staticmethod
  1007. def _append_filer_by_value(
  1008. params: List[str], min_value: Optional[int], max_value: Optional[int]
  1009. ):
  1010. """Append FILTER_BY_VALUE property to params."""
  1011. if min_value is not None and max_value is not None:
  1012. params.extend(["FILTER_BY_VALUE", min_value, max_value])
  1013. @staticmethod
  1014. def _append_latest(params: List[str], latest: Optional[bool]):
  1015. """Append LATEST property to params."""
  1016. if latest:
  1017. params.append("LATEST")
  1018. @staticmethod
  1019. def _append_bucket_timestamp(params: List[str], bucket_timestamp: Optional[str]):
  1020. """Append BUCKET_TIMESTAMP property to params."""
  1021. if bucket_timestamp is not None:
  1022. params.extend(["BUCKETTIMESTAMP", bucket_timestamp])
  1023. @staticmethod
  1024. def _append_empty(params: List[str], empty: Optional[bool]):
  1025. """Append EMPTY property to params."""
  1026. if empty:
  1027. params.append("EMPTY")
  1028. @staticmethod
  1029. def _append_insertion_filters(
  1030. params: List[str],
  1031. ignore_max_time_diff: Optional[int] = None,
  1032. ignore_max_val_diff: Optional[Number] = None,
  1033. ):
  1034. """Append insertion filters to params."""
  1035. if (ignore_max_time_diff is None) != (ignore_max_val_diff is None):
  1036. raise ValueError(
  1037. "Both ignore_max_time_diff and ignore_max_val_diff must be set."
  1038. )
  1039. if ignore_max_time_diff is not None and ignore_max_val_diff is not None:
  1040. params.extend(
  1041. ["IGNORE", str(ignore_max_time_diff), str(ignore_max_val_diff)]
  1042. )