Skip to content
This repository has been archived by the owner on Sep 30, 2020. It is now read-only.

Commit

Permalink
Add triggers to the protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwkmwkmwk committed Apr 13, 2017
1 parent 508a093 commit 2cf24d7
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 2 deletions.
13 changes: 13 additions & 0 deletions python/veles/async_conn/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ def get_bindata_size(self, id, key):
return self._get_from_node(
id, lambda node: self._get_bindata_size(node, key))

def _get_trigger(self, node, key):
res = node.triggers.get(key)
self.checks.append(check.CheckTrigger(
node=node.id,
key=key,
state=res,
))
return res

def get_trigger(self, id, key):
return self._get_from_node(
id, lambda node: self._get_trigger(node, key))

async def _get_data(self, node, key, adata):
try:
res = await adata
Expand Down
19 changes: 19 additions & 0 deletions python/veles/db/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def _check_ok_bindata(self, el):
data = self.get_bindata(el.node, el.key, el.start, el.end)
return data == el.data

def _check_ok_trigger(self, el):
node = self.get(el.node)
return node.triggers.get(el.key) == el.state

def _check_ok_list(self, el):
nodes = self.get_list_raw(el.parent, el.tags, el.pos_filter)
return el.nodes == nodes
Expand All @@ -159,6 +163,7 @@ def checks_ok(self, checks):
check.CheckData: self._check_ok_data,
check.CheckBinDataSize: self._check_ok_bindata_size,
check.CheckBinData: self._check_ok_bindata,
check.CheckTrigger: self._check_ok_trigger,
check.CheckList: self._check_ok_list,
}
try:
Expand Down Expand Up @@ -290,6 +295,18 @@ def _op_set_bindata(self, xact, op, dbnode):
else:
del dbnode.node.bindata[op.key]

def _op_add_trigger(self, xact, op, dbnode):
if dbnode.node is None:
raise ObjectGoneError()
# XXX
raise NotImplementedError

def _op_del_trigger(self, xact, op, dbnode):
if dbnode.node is None:
raise ObjectGoneError()
# XXX
raise NotImplementedError

def transaction(self, checks, ops):
if not self.checks_ok(checks):
raise PreconditionFailedError()
Expand All @@ -304,6 +321,8 @@ def transaction(self, checks, ops):
operation.OperationSetAttr: self._op_set_attr,
operation.OperationSetData: self._op_set_data,
operation.OperationSetBinData: self._op_set_bindata,
operation.OperationAddTrigger: self._op_add_trigger,
operation.OperationDelTrigger: self._op_del_trigger,
}
for op in ops:
dbnode = self.get_cached_node(op.node)
Expand Down
10 changes: 9 additions & 1 deletion python/veles/proto/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import unicode_literals

from veles.schema import model, fields
from veles.proto.node import PosFilter
from veles.proto.node import TriggerState, PosFilter


class Check(model.PolymorphicModel):
Expand Down Expand Up @@ -92,6 +92,14 @@ class CheckBinData(Check):
data = fields.Binary()


class CheckTrigger(Check):
object_type = 'trigger'

node = fields.NodeID()
key = fields.String()
state = fields.Enum(TriggerState, optional=True)


class CheckList(Check):
object_type = 'list'

Expand Down
1 change: 1 addition & 0 deletions python/veles/proto/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class MsgCreate(MsgpackMsg):
attr = fields.Map(fields.String(), fields.Any())
data = fields.Map(fields.String(), fields.Any())
bindata = fields.Map(fields.String(), fields.Binary())
triggers = fields.Set(fields.String())


class MsgDelete(MsgpackMsg):
Expand Down
12 changes: 11 additions & 1 deletion python/veles/proto/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from veles.schema import model, fields
from veles.schema import model, fields, enumeration
from veles.schema.nodeid import NodeID


class TriggerState(enumeration.EnumModel):
pending = 'pending'
done = 'done'
error = 'error'


class Node(model.Model):
"""
Stores the lightweight data associated with a node, and an index of heavy
Expand All @@ -34,6 +40,9 @@ class Node(model.Model):
this dict correspond to bindata keys, and the values are size of the
corresponding binary data in bytes. The actual bindata has to
be downloaded separately.
- ``triggers``: a dict containing active triggers for the given node.
The keys are trigger names, and they are mapped to the state of
the given trigger.
"""

id = fields.NodeID()
Expand All @@ -44,6 +53,7 @@ class Node(model.Model):
attr = fields.Map(fields.String(), fields.Any())
data = fields.Set(fields.String())
bindata = fields.Map(fields.String(), fields.SmallUnsignedInteger())
triggers = fields.Map(fields.String(), fields.Enum(TriggerState))


class PosFilter(model.Model):
Expand Down
12 changes: 12 additions & 0 deletions python/veles/proto/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,15 @@ class OperationSetBinData(Operation):
start = fields.SmallUnsignedInteger(default=0)
data = fields.Binary()
truncate = fields.Boolean(default=False)


class OperationAddTrigger(Operation):
object_type = 'add_trigger'

trigger = fields.String()


class OperationDelTrigger(Operation):
object_type = 'del_trigger'

trigger = fields.String()

0 comments on commit 2cf24d7

Please sign in to comment.