Skip to content

Commit 84a0024

Browse files
committed
feat(core): Add support for Container and TTL nodes
Also add support through transations. Closes #334, #496
1 parent 9bb8499 commit 84a0024

File tree

3 files changed

+197
-92
lines changed

3 files changed

+197
-92
lines changed

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
[flake8]
2-
ignore = BLK100
2+
ignore = BLK100,W503

kazoo/client.py

Lines changed: 125 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
CloseInstance,
3333
Create,
3434
Create2,
35+
CreateContainer,
36+
CreateTTL,
3537
Delete,
3638
Exists,
3739
GetChildren,
@@ -873,7 +875,8 @@ def sync(self, path):
873875
return self.sync_async(path).get()
874876

875877
def create(self, path, value=b"", acl=None, ephemeral=False,
876-
sequence=False, makepath=False, include_data=False):
878+
sequence=False, makepath=False, include_data=False,
879+
container=False, ttl=0):
877880
"""Create a node with the given value as its data. Optionally
878881
set an ACL on the node.
879882
@@ -950,15 +953,19 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
950953
The `makepath` option.
951954
.. versionadded:: 2.7
952955
The `include_data` option.
956+
.. versionadded:: 2.8
957+
The container and ttl options.
958+
953959
"""
954960
acl = acl or self.default_acl
955961
return self.create_async(
956962
path, value, acl=acl, ephemeral=ephemeral,
957-
sequence=sequence, makepath=makepath, include_data=include_data
958-
).get()
963+
sequence=sequence, makepath=makepath, include_data=include_data,
964+
container=container, ttl=ttl).get()
959965

960966
def create_async(self, path, value=b"", acl=None, ephemeral=False,
961-
sequence=False, makepath=False, include_data=False):
967+
sequence=False, makepath=False, include_data=False,
968+
container=False, ttl=0):
962969
"""Asynchronously create a ZNode. Takes the same arguments as
963970
:meth:`create`.
964971
@@ -967,45 +974,35 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
967974
.. versionadded:: 1.1
968975
The makepath option.
969976
.. versionadded:: 2.7
970-
The `include_data` option.
977+
The include_data option.
978+
.. versionadded:: 2.8
979+
The container and ttl options.
971980
"""
972981
if acl is None and self.default_acl:
973982
acl = self.default_acl
974-
975-
if not isinstance(path, string_types):
976-
raise TypeError("Invalid type for 'path' (string expected)")
977-
if acl and (isinstance(acl, ACL) or
978-
not isinstance(acl, (tuple, list))):
979-
raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
980-
" of ACL's")
981-
if value is not None and not isinstance(value, bytes_types):
982-
raise TypeError("Invalid type for 'value' (must be a byte string)")
983-
if not isinstance(ephemeral, bool):
984-
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
985-
if not isinstance(sequence, bool):
986-
raise TypeError("Invalid type for 'sequence' (bool expected)")
987983
if not isinstance(makepath, bool):
988984
raise TypeError("Invalid type for 'makepath' (bool expected)")
989-
if not isinstance(include_data, bool):
990-
raise TypeError("Invalid type for 'include_data' (bool expected)")
991-
992-
flags = 0
993-
if ephemeral:
994-
flags |= 1
995-
if sequence:
996-
flags |= 2
997-
if acl is None:
998-
acl = OPEN_ACL_UNSAFE
999985

986+
opcode = _create_opcode(
987+
path, value, acl, self.chroot,
988+
ephemeral, sequence, include_data, container, ttl
989+
)
1000990
async_result = self.handler.async_result()
1001991

1002992
@capture_exceptions(async_result)
1003993
def do_create():
1004-
result = self._create_async_inner(
1005-
path, value, acl, flags,
1006-
trailing=sequence, include_data=include_data
1007-
)
1008-
result.rawlink(create_completion)
994+
inner_async_result = self.handler.async_result()
995+
996+
call_result = self._call(opcode, inner_async_result)
997+
if call_result is False:
998+
# We hit a short-circuit exit on the _call. Because we are
999+
# not using the original async_result here, we bubble the
1000+
# exception upwards to the do_create function in
1001+
# KazooClient.create so that it gets set on the correct
1002+
# async_result object
1003+
raise inner_async_result.exception
1004+
1005+
inner_async_result.rawlink(create_completion)
10091006

10101007
@capture_exceptions(async_result)
10111008
def retry_completion(result):
@@ -1015,11 +1012,11 @@ def retry_completion(result):
10151012
@wrap(async_result)
10161013
def create_completion(result):
10171014
try:
1018-
if include_data:
1015+
if opcode.type == Create.type:
1016+
return self.unchroot(result.get())
1017+
else:
10191018
new_path, stat = result.get()
10201019
return self.unchroot(new_path), stat
1021-
else:
1022-
return self.unchroot(result.get())
10231020
except NoNodeError:
10241021
if not makepath:
10251022
raise
@@ -1032,26 +1029,6 @@ def create_completion(result):
10321029
do_create()
10331030
return async_result
10341031

1035-
def _create_async_inner(self, path, value, acl, flags,
1036-
trailing=False, include_data=False):
1037-
async_result = self.handler.async_result()
1038-
if include_data:
1039-
opcode = Create2
1040-
else:
1041-
opcode = Create
1042-
1043-
call_result = self._call(
1044-
opcode(_prefix_root(self.chroot, path, trailing=trailing),
1045-
value, acl, flags), async_result)
1046-
if call_result is False:
1047-
# We hit a short-circuit exit on the _call. Because we are
1048-
# not using the original async_result here, we bubble the
1049-
# exception upwards to the do_create function in
1050-
# KazooClient.create so that it gets set on the correct
1051-
# async_result object
1052-
raise async_result.exception
1053-
return async_result
1054-
10551032
def ensure_path(self, path, acl=None):
10561033
"""Recursively create a path if it doesn't exist.
10571034
@@ -1590,39 +1567,24 @@ def __init__(self, client):
15901567
self.committed = False
15911568

15921569
def create(self, path, value=b"", acl=None, ephemeral=False,
1593-
sequence=False):
1570+
sequence=False, include_data=False, container=False, ttl=0):
15941571
"""Add a create ZNode to the transaction. Takes the same
15951572
arguments as :meth:`KazooClient.create`, with the exception
15961573
of `makepath`.
15971574
15981575
:returns: None
15991576
1577+
.. versionadded:: 2.8
1578+
The include_data, container and ttl options.
16001579
"""
16011580
if acl is None and self.client.default_acl:
16021581
acl = self.client.default_acl
16031582

1604-
if not isinstance(path, string_types):
1605-
raise TypeError("Invalid type for 'path' (string expected)")
1606-
if acl and not isinstance(acl, (tuple, list)):
1607-
raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
1608-
" of ACL's")
1609-
if not isinstance(value, bytes_types):
1610-
raise TypeError("Invalid type for 'value' (must be a byte string)")
1611-
if not isinstance(ephemeral, bool):
1612-
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
1613-
if not isinstance(sequence, bool):
1614-
raise TypeError("Invalid type for 'sequence' (bool expected)")
1615-
1616-
flags = 0
1617-
if ephemeral:
1618-
flags |= 1
1619-
if sequence:
1620-
flags |= 2
1621-
if acl is None:
1622-
acl = OPEN_ACL_UNSAFE
1623-
1624-
self._add(Create(_prefix_root(self.client.chroot, path), value, acl,
1625-
flags), None)
1583+
opcode = _create_opcode(
1584+
path, value, acl, self.client.chroot,
1585+
ephemeral, sequence, include_data, container, ttl
1586+
)
1587+
self._add(opcode, None)
16261588

16271589
def delete(self, path, version=-1):
16281590
"""Add a delete ZNode to the transaction. Takes the same
@@ -1701,3 +1663,87 @@ def _add(self, request, post_processor=None):
17011663
self._check_tx_state()
17021664
self.client.logger.log(BLATHER, 'Added %r to %r', request, self)
17031665
self.operations.append(request)
1666+
1667+
1668+
def _create_opcode(path, value, acl, chroot,
1669+
ephemeral, sequence, include_data, container, ttl):
1670+
"""Helper function.
1671+
Creates the create OpCode for regular `client.create()` operations as
1672+
well as in a `client.transaction()` context.
1673+
"""
1674+
if not isinstance(path, string_types):
1675+
raise TypeError("Invalid type for 'path' (string expected)")
1676+
if (acl and (
1677+
not isinstance(acl, (tuple, list))
1678+
or isinstance(acl, ACL))
1679+
):
1680+
raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
1681+
" of ACL's")
1682+
if value is not None and not isinstance(value, bytes_types):
1683+
raise TypeError("Invalid type for 'value' (must be a byte string)")
1684+
if not isinstance(ephemeral, bool):
1685+
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
1686+
if not isinstance(sequence, bool):
1687+
raise TypeError("Invalid type for 'sequence' (bool expected)")
1688+
if not isinstance(include_data, bool):
1689+
raise TypeError("Invalid type for 'include_data' (bool expected)")
1690+
if not isinstance(container, bool):
1691+
raise TypeError("Invalid type for 'container' (bool expected)")
1692+
if not isinstance(ttl, int) or ttl < 0:
1693+
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
1694+
if ttl and ephemeral:
1695+
raise TypeError("Invalid node creation: ephemeral & ttl")
1696+
if container and (ephemeral or sequence or ttl):
1697+
raise TypeError(
1698+
"Invalid node creation: container & ephemeral/sequence/ttl"
1699+
)
1700+
1701+
# Should match Zookeeper's CreateMode fromFlag
1702+
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/
1703+
# src/main/java/org/apache/zookeeper/CreateMode.java#L112
1704+
flags = 0
1705+
if ephemeral:
1706+
flags |= 1
1707+
if sequence:
1708+
flags |= 2
1709+
if container:
1710+
flags = 4
1711+
if ttl:
1712+
if sequence:
1713+
flags = 6
1714+
else:
1715+
flags = 5
1716+
1717+
if acl is None:
1718+
acl = OPEN_ACL_UNSAFE
1719+
1720+
# Figure out the OpCode we are going to send
1721+
if include_data:
1722+
return Create2(
1723+
_prefix_root(chroot, path, trailing=sequence),
1724+
value,
1725+
acl,
1726+
flags
1727+
)
1728+
elif container:
1729+
return CreateContainer(
1730+
_prefix_root(chroot, path, trailing=False),
1731+
value,
1732+
acl,
1733+
flags
1734+
)
1735+
elif ttl:
1736+
return CreateTTL(
1737+
_prefix_root(chroot, path, trailing=sequence),
1738+
value,
1739+
acl,
1740+
flags,
1741+
ttl
1742+
)
1743+
else:
1744+
return Create(
1745+
_prefix_root(chroot, path, trailing=sequence),
1746+
value,
1747+
acl,
1748+
flags
1749+
)

0 commit comments

Comments
 (0)