图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
10 KiB

  1. from redis import DataError
  2. from redis.exceptions import ResponseError
  3. from .exceptions import VersionMismatchException
  4. from .execution_plan import ExecutionPlan
  5. from .query_result import AsyncQueryResult, QueryResult
  6. PROFILE_CMD = "GRAPH.PROFILE"
  7. RO_QUERY_CMD = "GRAPH.RO_QUERY"
  8. QUERY_CMD = "GRAPH.QUERY"
  9. DELETE_CMD = "GRAPH.DELETE"
  10. SLOWLOG_CMD = "GRAPH.SLOWLOG"
  11. CONFIG_CMD = "GRAPH.CONFIG"
  12. LIST_CMD = "GRAPH.LIST"
  13. EXPLAIN_CMD = "GRAPH.EXPLAIN"
  14. class GraphCommands:
  15. """RedisGraph Commands"""
  16. def commit(self):
  17. """
  18. Create entire graph.
  19. """
  20. if len(self.nodes) == 0 and len(self.edges) == 0:
  21. return None
  22. query = "CREATE "
  23. for _, node in self.nodes.items():
  24. query += str(node) + ","
  25. query += ",".join([str(edge) for edge in self.edges])
  26. # Discard leading comma.
  27. if query[-1] == ",":
  28. query = query[:-1]
  29. return self.query(query)
  30. def query(self, q, params=None, timeout=None, read_only=False, profile=False):
  31. """
  32. Executes a query against the graph.
  33. For more information see `GRAPH.QUERY <https://redis.io/commands/graph.query>`_. # noqa
  34. Args:
  35. q : str
  36. The query.
  37. params : dict
  38. Query parameters.
  39. timeout : int
  40. Maximum runtime for read queries in milliseconds.
  41. read_only : bool
  42. Executes a readonly query if set to True.
  43. profile : bool
  44. Return details on results produced by and time
  45. spent in each operation.
  46. """
  47. # maintain original 'q'
  48. query = q
  49. # handle query parameters
  50. query = self._build_params_header(params) + query
  51. # construct query command
  52. # ask for compact result-set format
  53. # specify known graph version
  54. if profile:
  55. cmd = PROFILE_CMD
  56. else:
  57. cmd = RO_QUERY_CMD if read_only else QUERY_CMD
  58. command = [cmd, self.name, query, "--compact"]
  59. # include timeout is specified
  60. if isinstance(timeout, int):
  61. command.extend(["timeout", timeout])
  62. elif timeout is not None:
  63. raise Exception("Timeout argument must be a positive integer")
  64. # issue query
  65. try:
  66. response = self.execute_command(*command)
  67. return QueryResult(self, response, profile)
  68. except ResponseError as e:
  69. if "unknown command" in str(e) and read_only:
  70. # `GRAPH.RO_QUERY` is unavailable in older versions.
  71. return self.query(q, params, timeout, read_only=False)
  72. raise e
  73. except VersionMismatchException as e:
  74. # client view over the graph schema is out of sync
  75. # set client version and refresh local schema
  76. self.version = e.version
  77. self._refresh_schema()
  78. # re-issue query
  79. return self.query(q, params, timeout, read_only)
  80. def merge(self, pattern):
  81. """
  82. Merge pattern.
  83. """
  84. query = "MERGE "
  85. query += str(pattern)
  86. return self.query(query)
  87. def delete(self):
  88. """
  89. Deletes graph.
  90. For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa
  91. """
  92. self._clear_schema()
  93. return self.execute_command(DELETE_CMD, self.name)
  94. # declared here, to override the built in redis.db.flush()
  95. def flush(self):
  96. """
  97. Commit the graph and reset the edges and the nodes to zero length.
  98. """
  99. self.commit()
  100. self.nodes = {}
  101. self.edges = []
  102. def bulk(self, **kwargs):
  103. """Internal only. Not supported."""
  104. raise NotImplementedError(
  105. "GRAPH.BULK is internal only. "
  106. "Use https://github.com/redisgraph/redisgraph-bulk-loader."
  107. )
  108. def profile(self, query):
  109. """
  110. Execute a query and produce an execution plan augmented with metrics
  111. for each operation's execution. Return a string representation of a
  112. query execution plan, with details on results produced by and time
  113. spent in each operation.
  114. For more information see `GRAPH.PROFILE <https://redis.io/commands/graph.profile>`_. # noqa
  115. """
  116. return self.query(query, profile=True)
  117. def slowlog(self):
  118. """
  119. Get a list containing up to 10 of the slowest queries issued
  120. against the given graph ID.
  121. For more information see `GRAPH.SLOWLOG <https://redis.io/commands/graph.slowlog>`_. # noqa
  122. Each item in the list has the following structure:
  123. 1. A unix timestamp at which the log entry was processed.
  124. 2. The issued command.
  125. 3. The issued query.
  126. 4. The amount of time needed for its execution, in milliseconds.
  127. """
  128. return self.execute_command(SLOWLOG_CMD, self.name)
  129. def config(self, name, value=None, set=False):
  130. """
  131. Retrieve or update a RedisGraph configuration.
  132. For more information see `https://redis.io/commands/graph.config-get/>`_. # noqa
  133. Args:
  134. name : str
  135. The name of the configuration
  136. value :
  137. The value we want to set (can be used only when `set` is on)
  138. set : bool
  139. Turn on to set a configuration. Default behavior is get.
  140. """
  141. params = ["SET" if set else "GET", name]
  142. if value is not None:
  143. if set:
  144. params.append(value)
  145. else:
  146. raise DataError(
  147. "``value`` can be provided only when ``set`` is True"
  148. ) # noqa
  149. return self.execute_command(CONFIG_CMD, *params)
  150. def list_keys(self):
  151. """
  152. Lists all graph keys in the keyspace.
  153. For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa
  154. """
  155. return self.execute_command(LIST_CMD)
  156. def execution_plan(self, query, params=None):
  157. """
  158. Get the execution plan for given query,
  159. GRAPH.EXPLAIN returns an array of operations.
  160. Args:
  161. query: the query that will be executed
  162. params: query parameters
  163. """
  164. query = self._build_params_header(params) + query
  165. plan = self.execute_command(EXPLAIN_CMD, self.name, query)
  166. if isinstance(plan[0], bytes):
  167. plan = [b.decode() for b in plan]
  168. return "\n".join(plan)
  169. def explain(self, query, params=None):
  170. """
  171. Get the execution plan for given query,
  172. GRAPH.EXPLAIN returns ExecutionPlan object.
  173. For more information see `GRAPH.EXPLAIN <https://redis.io/commands/graph.explain>`_. # noqa
  174. Args:
  175. query: the query that will be executed
  176. params: query parameters
  177. """
  178. query = self._build_params_header(params) + query
  179. plan = self.execute_command(EXPLAIN_CMD, self.name, query)
  180. return ExecutionPlan(plan)
  181. class AsyncGraphCommands(GraphCommands):
  182. async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
  183. """
  184. Executes a query against the graph.
  185. For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa
  186. Args:
  187. q : str
  188. The query.
  189. params : dict
  190. Query parameters.
  191. timeout : int
  192. Maximum runtime for read queries in milliseconds.
  193. read_only : bool
  194. Executes a readonly query if set to True.
  195. profile : bool
  196. Return details on results produced by and time
  197. spent in each operation.
  198. """
  199. # maintain original 'q'
  200. query = q
  201. # handle query parameters
  202. query = self._build_params_header(params) + query
  203. # construct query command
  204. # ask for compact result-set format
  205. # specify known graph version
  206. if profile:
  207. cmd = PROFILE_CMD
  208. else:
  209. cmd = RO_QUERY_CMD if read_only else QUERY_CMD
  210. command = [cmd, self.name, query, "--compact"]
  211. # include timeout is specified
  212. if isinstance(timeout, int):
  213. command.extend(["timeout", timeout])
  214. elif timeout is not None:
  215. raise Exception("Timeout argument must be a positive integer")
  216. # issue query
  217. try:
  218. response = await self.execute_command(*command)
  219. return await AsyncQueryResult().initialize(self, response, profile)
  220. except ResponseError as e:
  221. if "unknown command" in str(e) and read_only:
  222. # `GRAPH.RO_QUERY` is unavailable in older versions.
  223. return await self.query(q, params, timeout, read_only=False)
  224. raise e
  225. except VersionMismatchException as e:
  226. # client view over the graph schema is out of sync
  227. # set client version and refresh local schema
  228. self.version = e.version
  229. self._refresh_schema()
  230. # re-issue query
  231. return await self.query(q, params, timeout, read_only)
  232. async def execution_plan(self, query, params=None):
  233. """
  234. Get the execution plan for given query,
  235. GRAPH.EXPLAIN returns an array of operations.
  236. Args:
  237. query: the query that will be executed
  238. params: query parameters
  239. """
  240. query = self._build_params_header(params) + query
  241. plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
  242. if isinstance(plan[0], bytes):
  243. plan = [b.decode() for b in plan]
  244. return "\n".join(plan)
  245. async def explain(self, query, params=None):
  246. """
  247. Get the execution plan for given query,
  248. GRAPH.EXPLAIN returns ExecutionPlan object.
  249. Args:
  250. query: the query that will be executed
  251. params: query parameters
  252. """
  253. query = self._build_params_header(params) + query
  254. plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
  255. return ExecutionPlan(plan)
  256. async def flush(self):
  257. """
  258. Commit the graph and reset the edges and the nodes to zero length.
  259. """
  260. await self.commit()
  261. self.nodes = {}
  262. self.edges = []