图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

479 lines
13 KiB

  1. """Zookeeper Serializers, Deserializers, and NamedTuple objects"""
  2. from collections import namedtuple
  3. import struct
  4. from kazoo.exceptions import EXCEPTIONS
  5. from kazoo.protocol.states import ZnodeStat
  6. from kazoo.security import ACL
  7. from kazoo.security import Id
  8. # Struct objects with formats compiled
  9. bool_struct = struct.Struct("B")
  10. int_struct = struct.Struct("!i")
  11. int_int_struct = struct.Struct("!ii")
  12. int_int_long_struct = struct.Struct("!iiq")
  13. int_long_int_long_struct = struct.Struct("!iqiq")
  14. long_struct = struct.Struct("!q")
  15. multiheader_struct = struct.Struct("!iBi")
  16. reply_header_struct = struct.Struct("!iqi")
  17. stat_struct = struct.Struct("!qqqqiiiqiiq")
  18. def read_string(buffer, offset):
  19. """Reads an int specified buffer into a string and returns the
  20. string and the new offset in the buffer"""
  21. length = int_struct.unpack_from(buffer, offset)[0]
  22. offset += int_struct.size
  23. if length < 0:
  24. return None, offset
  25. else:
  26. index = offset
  27. offset += length
  28. return buffer[index : index + length].decode("utf-8"), offset
  29. def read_acl(bytes, offset):
  30. perms = int_struct.unpack_from(bytes, offset)[0]
  31. offset += int_struct.size
  32. scheme, offset = read_string(bytes, offset)
  33. id, offset = read_string(bytes, offset)
  34. return ACL(perms, Id(scheme, id)), offset
  35. def write_string(bytes):
  36. if not bytes:
  37. return int_struct.pack(-1)
  38. else:
  39. utf8_str = bytes.encode("utf-8")
  40. return int_struct.pack(len(utf8_str)) + utf8_str
  41. def write_buffer(bytes):
  42. if bytes is None:
  43. return int_struct.pack(-1)
  44. else:
  45. return int_struct.pack(len(bytes)) + bytes
  46. def read_buffer(bytes, offset):
  47. length = int_struct.unpack_from(bytes, offset)[0]
  48. offset += int_struct.size
  49. if length < 0:
  50. return None, offset
  51. else:
  52. index = offset
  53. offset += length
  54. return bytes[index : index + length], offset
  55. class Close(namedtuple("Close", "")):
  56. type = -11
  57. @classmethod
  58. def serialize(cls):
  59. return b""
  60. CloseInstance = Close()
  61. class Ping(namedtuple("Ping", "")):
  62. type = 11
  63. @classmethod
  64. def serialize(cls):
  65. return b""
  66. PingInstance = Ping()
  67. class Connect(
  68. namedtuple(
  69. "Connect",
  70. "protocol_version last_zxid_seen"
  71. " time_out session_id passwd read_only",
  72. )
  73. ):
  74. type = None
  75. def serialize(self):
  76. b = bytearray()
  77. b.extend(
  78. int_long_int_long_struct.pack(
  79. self.protocol_version,
  80. self.last_zxid_seen,
  81. self.time_out,
  82. self.session_id,
  83. )
  84. )
  85. b.extend(write_buffer(self.passwd))
  86. b.extend([1 if self.read_only else 0])
  87. return b
  88. @classmethod
  89. def deserialize(cls, bytes, offset):
  90. proto_version, timeout, session_id = int_int_long_struct.unpack_from(
  91. bytes, offset
  92. )
  93. offset += int_int_long_struct.size
  94. password, offset = read_buffer(bytes, offset)
  95. try:
  96. read_only = bool_struct.unpack_from(bytes, offset)[0] == 1
  97. offset += bool_struct.size
  98. except struct.error:
  99. read_only = False
  100. return (
  101. cls(proto_version, 0, timeout, session_id, password, read_only),
  102. offset,
  103. )
  104. class Create(namedtuple("Create", "path data acl flags")):
  105. type = 1
  106. def serialize(self):
  107. b = bytearray()
  108. b.extend(write_string(self.path))
  109. b.extend(write_buffer(self.data))
  110. b.extend(int_struct.pack(len(self.acl)))
  111. for acl in self.acl:
  112. b.extend(
  113. int_struct.pack(acl.perms)
  114. + write_string(acl.id.scheme)
  115. + write_string(acl.id.id)
  116. )
  117. b.extend(int_struct.pack(self.flags))
  118. return b
  119. @classmethod
  120. def deserialize(cls, bytes, offset):
  121. return read_string(bytes, offset)[0]
  122. class Delete(namedtuple("Delete", "path version")):
  123. type = 2
  124. def serialize(self):
  125. b = bytearray()
  126. b.extend(write_string(self.path))
  127. b.extend(int_struct.pack(self.version))
  128. return b
  129. @classmethod
  130. def deserialize(self, bytes, offset):
  131. return True
  132. class Exists(namedtuple("Exists", "path watcher")):
  133. type = 3
  134. def serialize(self):
  135. b = bytearray()
  136. b.extend(write_string(self.path))
  137. b.extend([1 if self.watcher else 0])
  138. return b
  139. @classmethod
  140. def deserialize(cls, bytes, offset):
  141. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  142. return stat if stat.czxid != -1 else None
  143. class GetData(namedtuple("GetData", "path watcher")):
  144. type = 4
  145. def serialize(self):
  146. b = bytearray()
  147. b.extend(write_string(self.path))
  148. b.extend([1 if self.watcher else 0])
  149. return b
  150. @classmethod
  151. def deserialize(cls, bytes, offset):
  152. data, offset = read_buffer(bytes, offset)
  153. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  154. return data, stat
  155. class SetData(namedtuple("SetData", "path data version")):
  156. type = 5
  157. def serialize(self):
  158. b = bytearray()
  159. b.extend(write_string(self.path))
  160. b.extend(write_buffer(self.data))
  161. b.extend(int_struct.pack(self.version))
  162. return b
  163. @classmethod
  164. def deserialize(cls, bytes, offset):
  165. return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  166. class GetACL(namedtuple("GetACL", "path")):
  167. type = 6
  168. def serialize(self):
  169. return bytearray(write_string(self.path))
  170. @classmethod
  171. def deserialize(cls, bytes, offset):
  172. count = int_struct.unpack_from(bytes, offset)[0]
  173. offset += int_struct.size
  174. if count == -1: # pragma: nocover
  175. return []
  176. acls = []
  177. for c in range(count):
  178. acl, offset = read_acl(bytes, offset)
  179. acls.append(acl)
  180. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  181. return acls, stat
  182. class SetACL(namedtuple("SetACL", "path acls version")):
  183. type = 7
  184. def serialize(self):
  185. b = bytearray()
  186. b.extend(write_string(self.path))
  187. b.extend(int_struct.pack(len(self.acls)))
  188. for acl in self.acls:
  189. b.extend(
  190. int_struct.pack(acl.perms)
  191. + write_string(acl.id.scheme)
  192. + write_string(acl.id.id)
  193. )
  194. b.extend(int_struct.pack(self.version))
  195. return b
  196. @classmethod
  197. def deserialize(cls, bytes, offset):
  198. return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  199. class GetChildren(namedtuple("GetChildren", "path watcher")):
  200. type = 8
  201. def serialize(self):
  202. b = bytearray()
  203. b.extend(write_string(self.path))
  204. b.extend([1 if self.watcher else 0])
  205. return b
  206. @classmethod
  207. def deserialize(cls, bytes, offset):
  208. count = int_struct.unpack_from(bytes, offset)[0]
  209. offset += int_struct.size
  210. if count == -1: # pragma: nocover
  211. return []
  212. children = []
  213. for c in range(count):
  214. child, offset = read_string(bytes, offset)
  215. children.append(child)
  216. return children
  217. class Sync(namedtuple("Sync", "path")):
  218. type = 9
  219. def serialize(self):
  220. return write_string(self.path)
  221. @classmethod
  222. def deserialize(cls, buffer, offset):
  223. return read_string(buffer, offset)[0]
  224. class GetChildren2(namedtuple("GetChildren2", "path watcher")):
  225. type = 12
  226. def serialize(self):
  227. b = bytearray()
  228. b.extend(write_string(self.path))
  229. b.extend([1 if self.watcher else 0])
  230. return b
  231. @classmethod
  232. def deserialize(cls, bytes, offset):
  233. count = int_struct.unpack_from(bytes, offset)[0]
  234. offset += int_struct.size
  235. if count == -1: # pragma: nocover
  236. return []
  237. children = []
  238. for c in range(count):
  239. child, offset = read_string(bytes, offset)
  240. children.append(child)
  241. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  242. return children, stat
  243. class CheckVersion(namedtuple("CheckVersion", "path version")):
  244. type = 13
  245. def serialize(self):
  246. b = bytearray()
  247. b.extend(write_string(self.path))
  248. b.extend(int_struct.pack(self.version))
  249. return b
  250. class Transaction(namedtuple("Transaction", "operations")):
  251. type = 14
  252. def serialize(self):
  253. b = bytearray()
  254. for op in self.operations:
  255. b.extend(
  256. MultiHeader(op.type, False, -1).serialize() + op.serialize()
  257. )
  258. return b + multiheader_struct.pack(-1, True, -1)
  259. @classmethod
  260. def deserialize(cls, bytes, offset):
  261. header = MultiHeader(None, False, None)
  262. results = []
  263. response = None
  264. while not header.done:
  265. if header.type == Create.type:
  266. response, offset = read_string(bytes, offset)
  267. elif header.type == Delete.type:
  268. response = True
  269. elif header.type == SetData.type:
  270. response = ZnodeStat._make(
  271. stat_struct.unpack_from(bytes, offset)
  272. )
  273. offset += stat_struct.size
  274. elif header.type == CheckVersion.type:
  275. response = True
  276. elif header.type == -1:
  277. err = int_struct.unpack_from(bytes, offset)[0]
  278. offset += int_struct.size
  279. response = EXCEPTIONS[err]()
  280. if response:
  281. results.append(response)
  282. header, offset = MultiHeader.deserialize(bytes, offset)
  283. return results
  284. @staticmethod
  285. def unchroot(client, response):
  286. resp = []
  287. for result in response:
  288. if isinstance(result, str):
  289. resp.append(client.unchroot(result))
  290. else:
  291. resp.append(result)
  292. return resp
  293. class Create2(namedtuple("Create2", "path data acl flags")):
  294. type = 15
  295. def serialize(self):
  296. b = bytearray()
  297. b.extend(write_string(self.path))
  298. b.extend(write_buffer(self.data))
  299. b.extend(int_struct.pack(len(self.acl)))
  300. for acl in self.acl:
  301. b.extend(
  302. int_struct.pack(acl.perms)
  303. + write_string(acl.id.scheme)
  304. + write_string(acl.id.id)
  305. )
  306. b.extend(int_struct.pack(self.flags))
  307. return b
  308. @classmethod
  309. def deserialize(cls, bytes, offset):
  310. path, offset = read_string(bytes, offset)
  311. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  312. return path, stat
  313. class Reconfig(
  314. namedtuple("Reconfig", "joining leaving new_members config_id")
  315. ):
  316. type = 16
  317. def serialize(self):
  318. b = bytearray()
  319. b.extend(write_string(self.joining))
  320. b.extend(write_string(self.leaving))
  321. b.extend(write_string(self.new_members))
  322. b.extend(long_struct.pack(self.config_id))
  323. return b
  324. @classmethod
  325. def deserialize(cls, bytes, offset):
  326. data, offset = read_buffer(bytes, offset)
  327. stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
  328. return data, stat
  329. class Auth(namedtuple("Auth", "auth_type scheme auth")):
  330. type = 100
  331. def serialize(self):
  332. return (
  333. int_struct.pack(self.auth_type)
  334. + write_string(self.scheme)
  335. + write_string(self.auth)
  336. )
  337. class SASL(namedtuple("SASL", "challenge")):
  338. type = 102
  339. def serialize(self):
  340. b = bytearray()
  341. b.extend(write_buffer(self.challenge))
  342. return b
  343. @classmethod
  344. def deserialize(cls, bytes, offset):
  345. challenge, offset = read_buffer(bytes, offset)
  346. return challenge, offset
  347. class Watch(namedtuple("Watch", "type state path")):
  348. @classmethod
  349. def deserialize(cls, bytes, offset):
  350. """Given bytes and the current bytes offset, return the
  351. type, state, path, and new offset"""
  352. type, state = int_int_struct.unpack_from(bytes, offset)
  353. offset += int_int_struct.size
  354. path, offset = read_string(bytes, offset)
  355. return cls(type, state, path), offset
  356. class ReplyHeader(namedtuple("ReplyHeader", "xid, zxid, err")):
  357. @classmethod
  358. def deserialize(cls, bytes, offset):
  359. """Given bytes and the current bytes offset, return a
  360. :class:`ReplyHeader` instance and the new offset"""
  361. new_offset = offset + reply_header_struct.size
  362. return (
  363. cls._make(reply_header_struct.unpack_from(bytes, offset)),
  364. new_offset,
  365. )
  366. class MultiHeader(namedtuple("MultiHeader", "type done err")):
  367. def serialize(self):
  368. b = bytearray()
  369. b.extend(int_struct.pack(self.type))
  370. b.extend([1 if self.done else 0])
  371. b.extend(int_struct.pack(self.err))
  372. return b
  373. @classmethod
  374. def deserialize(cls, bytes, offset):
  375. t, done, err = multiheader_struct.unpack_from(bytes, offset)
  376. offset += multiheader_struct.size
  377. return cls(t, done == 1, err), offset