You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
3.4 KiB
108 lines
3.4 KiB
import redis
|
|
from redis._parsers.helpers import bool_ok
|
|
|
|
from ..helpers import get_protocol_version, parse_to_list
|
|
from .commands import (
|
|
ALTER_CMD,
|
|
CREATE_CMD,
|
|
CREATERULE_CMD,
|
|
DEL_CMD,
|
|
DELETERULE_CMD,
|
|
GET_CMD,
|
|
INFO_CMD,
|
|
MGET_CMD,
|
|
MRANGE_CMD,
|
|
MREVRANGE_CMD,
|
|
QUERYINDEX_CMD,
|
|
RANGE_CMD,
|
|
REVRANGE_CMD,
|
|
TimeSeriesCommands,
|
|
)
|
|
from .info import TSInfo
|
|
from .utils import parse_get, parse_m_get, parse_m_range, parse_range
|
|
|
|
|
|
class TimeSeries(TimeSeriesCommands):
|
|
"""
|
|
This class subclasses redis-py's `Redis` and implements RedisTimeSeries's
|
|
commands (prefixed with "ts").
|
|
The client allows to interact with RedisTimeSeries and use all of it's
|
|
functionality.
|
|
"""
|
|
|
|
def __init__(self, client=None, **kwargs):
|
|
"""Create a new RedisTimeSeries client."""
|
|
# Set the module commands' callbacks
|
|
self._MODULE_CALLBACKS = {
|
|
ALTER_CMD: bool_ok,
|
|
CREATE_CMD: bool_ok,
|
|
CREATERULE_CMD: bool_ok,
|
|
DELETERULE_CMD: bool_ok,
|
|
}
|
|
|
|
_RESP2_MODULE_CALLBACKS = {
|
|
DEL_CMD: int,
|
|
GET_CMD: parse_get,
|
|
INFO_CMD: TSInfo,
|
|
MGET_CMD: parse_m_get,
|
|
MRANGE_CMD: parse_m_range,
|
|
MREVRANGE_CMD: parse_m_range,
|
|
RANGE_CMD: parse_range,
|
|
REVRANGE_CMD: parse_range,
|
|
QUERYINDEX_CMD: parse_to_list,
|
|
}
|
|
_RESP3_MODULE_CALLBACKS = {}
|
|
|
|
self.client = client
|
|
self.execute_command = client.execute_command
|
|
|
|
if get_protocol_version(self.client) in ["3", 3]:
|
|
self._MODULE_CALLBACKS.update(_RESP3_MODULE_CALLBACKS)
|
|
else:
|
|
self._MODULE_CALLBACKS.update(_RESP2_MODULE_CALLBACKS)
|
|
|
|
for k, v in self._MODULE_CALLBACKS.items():
|
|
self.client.set_response_callback(k, v)
|
|
|
|
def pipeline(self, transaction=True, shard_hint=None):
|
|
"""Creates a pipeline for the TimeSeries module, that can be used
|
|
for executing only TimeSeries commands and core commands.
|
|
|
|
Usage example:
|
|
|
|
r = redis.Redis()
|
|
pipe = r.ts().pipeline()
|
|
for i in range(100):
|
|
pipeline.add("with_pipeline", i, 1.1 * i)
|
|
pipeline.execute()
|
|
|
|
"""
|
|
if isinstance(self.client, redis.RedisCluster):
|
|
p = ClusterPipeline(
|
|
nodes_manager=self.client.nodes_manager,
|
|
commands_parser=self.client.commands_parser,
|
|
startup_nodes=self.client.nodes_manager.startup_nodes,
|
|
result_callbacks=self.client.result_callbacks,
|
|
cluster_response_callbacks=self.client.cluster_response_callbacks,
|
|
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
|
|
read_from_replicas=self.client.read_from_replicas,
|
|
reinitialize_steps=self.client.reinitialize_steps,
|
|
lock=self.client._lock,
|
|
)
|
|
|
|
else:
|
|
p = Pipeline(
|
|
connection_pool=self.client.connection_pool,
|
|
response_callbacks=self._MODULE_CALLBACKS,
|
|
transaction=transaction,
|
|
shard_hint=shard_hint,
|
|
)
|
|
return p
|
|
|
|
|
|
class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline):
|
|
"""Cluster pipeline for the module."""
|
|
|
|
|
|
class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
|
|
"""Pipeline for the module."""
|