From 2cf24d75825c10d7d41480d6ebfd5dfae2b191ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Ko=C5=9Bcielnicki?= Date: Sun, 9 Apr 2017 19:08:58 +0200 Subject: [PATCH] Add triggers to the protocol. --- python/veles/async_conn/tracer.py | 13 +++++++++++++ python/veles/db/tracker.py | 19 +++++++++++++++++++ python/veles/proto/check.py | 10 +++++++++- python/veles/proto/messages.py | 1 + python/veles/proto/node.py | 12 +++++++++++- python/veles/proto/operation.py | 12 ++++++++++++ 6 files changed, 65 insertions(+), 2 deletions(-) diff --git a/python/veles/async_conn/tracer.py b/python/veles/async_conn/tracer.py index 6a548f43..92416a55 100644 --- a/python/veles/async_conn/tracer.py +++ b/python/veles/async_conn/tracer.py @@ -121,6 +121,19 @@ def get_bindata_size(self, id, key): return self._get_from_node( id, lambda node: self._get_bindata_size(node, key)) + def _get_trigger(self, node, key): + res = node.triggers.get(key) + self.checks.append(check.CheckTrigger( + node=node.id, + key=key, + state=res, + )) + return res + + def get_trigger(self, id, key): + return self._get_from_node( + id, lambda node: self._get_trigger(node, key)) + async def _get_data(self, node, key, adata): try: res = await adata diff --git a/python/veles/db/tracker.py b/python/veles/db/tracker.py index 1b8b9c1b..2818dba1 100644 --- a/python/veles/db/tracker.py +++ b/python/veles/db/tracker.py @@ -144,6 +144,10 @@ def _check_ok_bindata(self, el): data = self.get_bindata(el.node, el.key, el.start, el.end) return data == el.data + def _check_ok_trigger(self, el): + node = self.get(el.node) + return node.triggers.get(el.key) == el.state + def _check_ok_list(self, el): nodes = self.get_list_raw(el.parent, el.tags, el.pos_filter) return el.nodes == nodes @@ -159,6 +163,7 @@ def checks_ok(self, checks): check.CheckData: self._check_ok_data, check.CheckBinDataSize: self._check_ok_bindata_size, check.CheckBinData: self._check_ok_bindata, + check.CheckTrigger: self._check_ok_trigger, check.CheckList: self._check_ok_list, } try: @@ -290,6 +295,18 @@ def _op_set_bindata(self, xact, op, dbnode): else: del dbnode.node.bindata[op.key] + def _op_add_trigger(self, xact, op, dbnode): + if dbnode.node is None: + raise ObjectGoneError() + # XXX + raise NotImplementedError + + def _op_del_trigger(self, xact, op, dbnode): + if dbnode.node is None: + raise ObjectGoneError() + # XXX + raise NotImplementedError + def transaction(self, checks, ops): if not self.checks_ok(checks): raise PreconditionFailedError() @@ -304,6 +321,8 @@ def transaction(self, checks, ops): operation.OperationSetAttr: self._op_set_attr, operation.OperationSetData: self._op_set_data, operation.OperationSetBinData: self._op_set_bindata, + operation.OperationAddTrigger: self._op_add_trigger, + operation.OperationDelTrigger: self._op_del_trigger, } for op in ops: dbnode = self.get_cached_node(op.node) diff --git a/python/veles/proto/check.py b/python/veles/proto/check.py index 0ddc6b7e..77953aaf 100644 --- a/python/veles/proto/check.py +++ b/python/veles/proto/check.py @@ -15,7 +15,7 @@ from __future__ import unicode_literals from veles.schema import model, fields -from veles.proto.node import PosFilter +from veles.proto.node import TriggerState, PosFilter class Check(model.PolymorphicModel): @@ -92,6 +92,14 @@ class CheckBinData(Check): data = fields.Binary() +class CheckTrigger(Check): + object_type = 'trigger' + + node = fields.NodeID() + key = fields.String() + state = fields.Enum(TriggerState, optional=True) + + class CheckList(Check): object_type = 'list' diff --git a/python/veles/proto/messages.py b/python/veles/proto/messages.py index 1fe7ee8a..6ae0abba 100644 --- a/python/veles/proto/messages.py +++ b/python/veles/proto/messages.py @@ -303,6 +303,7 @@ class MsgCreate(MsgpackMsg): attr = fields.Map(fields.String(), fields.Any()) data = fields.Map(fields.String(), fields.Any()) bindata = fields.Map(fields.String(), fields.Binary()) + triggers = fields.Set(fields.String()) class MsgDelete(MsgpackMsg): diff --git a/python/veles/proto/node.py b/python/veles/proto/node.py index 11774569..c54702cf 100644 --- a/python/veles/proto/node.py +++ b/python/veles/proto/node.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from veles.schema import model, fields +from veles.schema import model, fields, enumeration from veles.schema.nodeid import NodeID +class TriggerState(enumeration.EnumModel): + pending = 'pending' + done = 'done' + error = 'error' + + class Node(model.Model): """ Stores the lightweight data associated with a node, and an index of heavy @@ -34,6 +40,9 @@ class Node(model.Model): this dict correspond to bindata keys, and the values are size of the corresponding binary data in bytes. The actual bindata has to be downloaded separately. + - ``triggers``: a dict containing active triggers for the given node. + The keys are trigger names, and they are mapped to the state of + the given trigger. """ id = fields.NodeID() @@ -44,6 +53,7 @@ class Node(model.Model): attr = fields.Map(fields.String(), fields.Any()) data = fields.Set(fields.String()) bindata = fields.Map(fields.String(), fields.SmallUnsignedInteger()) + triggers = fields.Map(fields.String(), fields.Enum(TriggerState)) class PosFilter(model.Model): diff --git a/python/veles/proto/operation.py b/python/veles/proto/operation.py index 1c18962f..e5feb349 100644 --- a/python/veles/proto/operation.py +++ b/python/veles/proto/operation.py @@ -84,3 +84,15 @@ class OperationSetBinData(Operation): start = fields.SmallUnsignedInteger(default=0) data = fields.Binary() truncate = fields.Boolean(default=False) + + +class OperationAddTrigger(Operation): + object_type = 'add_trigger' + + trigger = fields.String() + + +class OperationDelTrigger(Operation): + object_type = 'del_trigger' + + trigger = fields.String()