Skip to content

Commit

Permalink
Add ABNF for LVS
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Oct 11, 2024
1 parent d78bdc4 commit 177844e
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 51 deletions.
119 changes: 96 additions & 23 deletions src/ndn/app_support/light_versec/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# But this code is implemented independently without using any line of the
# original one, and released under Apache License.
#
# Copyright (C) 2019-2022 The python-ndn authors
# Copyright (C) 2019-2024 The python-ndn authors
#
# This file is part of python-ndn.
#
Expand All @@ -23,31 +23,43 @@
import ndn.encoding as enc


__all__ = ['VERSION', 'TypeNumber', 'UserFnArg', 'UserFnCall', 'ConstraintOption', 'PatternConstraint',
'PatternEdge', 'ValueEdge', 'Node', 'TagSymbol', 'LvsModel']
__all__ = [
"VERSION",
"TypeNumber",
"UserFnArg",
"UserFnCall",
"ConstraintOption",
"PatternConstraint",
"PatternEdge",
"ValueEdge",
"Node",
"TagSymbol",
"LvsModel",
]


VERSION = 0x00010000
MIN_SUPPORTED_VERSION = 0x00011000
VERSION = 0x00011000


class TypeNumber:
COMPONENT_VALUE = 0x01
PATTERN_TAG = 0x02
NODE_ID = 0x03
USER_FN_ID = 0x04
IDENTIFIER = 0x05
USER_FN_CALL = 0x11
FN_ARGS = 0x12
CONS_OPTION = 0x21
CONSTRAINT = 0x22
VALUE_EDGE = 0x31
PATTERN_EDGE = 0x32
KEY_NODE_ID = 0x33
PARENT_ID = 0x34
VERSION = 0x40
NODE = 0x41
TAG_SYMBOL = 0x42
NAMED_PATTERN_NUM = 0x43
COMPONENT_VALUE = 0x21
PATTERN_TAG = 0x23
NODE_ID = 0x25
USER_FN_ID = 0x27
IDENTIFIER = 0x29
USER_FN_CALL = 0x31
FN_ARGS = 0x33
CONS_OPTION = 0x41
CONSTRAINT = 0x43
VALUE_EDGE = 0x51
PATTERN_EDGE = 0x53
KEY_NODE_ID = 0x55
PARENT_ID = 0x57
VERSION = 0x61
NODE = 0x63
TAG_SYMBOL = 0x67
NAMED_PATTERN_NUM = 0x69


class UserFnArg(enc.TlvModel):
Expand All @@ -72,13 +84,17 @@ class ConstraintOption(enc.TlvModel):


class PatternConstraint(enc.TlvModel):
options = enc.RepeatedField(enc.ModelField(TypeNumber.CONS_OPTION, ConstraintOption))
options = enc.RepeatedField(
enc.ModelField(TypeNumber.CONS_OPTION, ConstraintOption)
)


class PatternEdge(enc.TlvModel):
dest = enc.UintField(TypeNumber.NODE_ID)
tag = enc.UintField(TypeNumber.PATTERN_TAG)
cons_sets = enc.RepeatedField(enc.ModelField(TypeNumber.CONSTRAINT, PatternConstraint))
cons_sets = enc.RepeatedField(
enc.ModelField(TypeNumber.CONSTRAINT, PatternConstraint)
)


class ValueEdge(enc.TlvModel):
Expand Down Expand Up @@ -106,3 +122,60 @@ class LvsModel(enc.TlvModel):
named_pattern_cnt = enc.UintField(TypeNumber.NAMED_PATTERN_NUM)
nodes = enc.RepeatedField(enc.ModelField(TypeNumber.NODE, Node))
symbols = enc.RepeatedField(enc.ModelField(TypeNumber.TAG_SYMBOL, TagSymbol))


# The following are for reference only and there is no commitment to implement them exactly as described.
# The implemented format is specified by the class LvsModel above.
__ABNF_FOR_REFERENCE__ = r"""
LvsModel = LVS-MODEL-TYPE TLV-LENGTH
Version
StartId
NamedPatternCnt
*Node
*TagSymbol
Version = VERSION-TYPE TLV-LENGTH NonNegativeInteger
StartId = NODE-ID-TYPE TLV-LENGTH NonNegativeInteger
NamedPatternCnt = NAMED-PATTERN-NUM-TYPE TLV-LENGTH NonNegativeInteger
Node = NODE-TYPE TLV-LENGTH
NodeId
[Parent]
*RuleName
*ValueEdge
*PatternEdge
*SignConstraint
NodeId = NODE-TYPE TLV-LENGTH NonNegativeInteger
Parent = NODE-TYPE TLV-LENGTH NonNegativeInteger
SignConstraint = KEY-NODE-ID-TYPE TLV-LENGTH NonNegativeInteger
RuleName = IDENTIFIER-TYPE TLV-LENGTH CNAME
CNAME = ("_" / ALPHA) *("_" / ALPHA / DIGIT)
ValueEdge = VALUE-EDGE-TYPE TLV-LENGTH
Destination
Value
Destination = NodeId
Value = COMPONENT-VALUE-TYPE TLV-LENGTH NameComponent
PatternEdge = PATTERN-EDGE-TYPE TLV-LENGTH
Destination
Tag
*Constraint
Tag = PATTERN-TAG-TYPE TLV-LENGTH NonNegativeInteger
Constraint = CONSTRAINT-TYPE TLV-LENGTH *ConstraintOption
ConstraintOption = CONS-OPTION-TYPE TLV-LENGTH (Value / Tag / UserFnCall)
UserFnCall = USER-FN-CALL-TYPE TLV-LENGTH
FnId
*UserFnArg
FnId = USER-FN-ID-TYPE TLV-LENGTH CNAME
UserFnArg = USER-FN-ARG-TYPE TLV-LENGTH (Value / Tag)
TagSymbol = TAG-SYMBOL-TYPE TLV-LENGTH
Tag
Identifier
Identifier = IDENTIFIER-TYPE TLV-LENGTH CNAME
"""
92 changes: 64 additions & 28 deletions src/ndn/app_support/light_versec/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
# limitations under the License.
# -----------------------------------------------------------------------------
from __future__ import annotations

from typing import Callable, Iterator
from ...encoding import Name, BinaryStr, FormalName, NonStrictName, Component

from ...encoding import BinaryStr, Component, FormalName, Name, NonStrictName
from ...security import Keychain
from ..security_v2 import parse_certificate
from . import binary as bny
from .compiler import top_order


__all__ = ['UserFn', 'LvsModelError', 'Checker', 'DEFAULT_USER_FNS']
__all__ = ["UserFn", "LvsModelError", "Checker", "DEFAULT_USER_FNS"]


UserFn = Callable[[BinaryStr, list[BinaryStr]], bool]
Expand All @@ -44,6 +46,7 @@ class LvsModelError(Exception):
"""
Raised when the input LVS model is malformed.
"""

pass


Expand Down Expand Up @@ -71,8 +74,11 @@ def __init__(self, model: bny.LvsModel, user_fns: dict[str, UserFn]):

def _sanity_check(self):
"""Basic sanity check. Also collect info for other testing."""
if self.model.version > bny.VERSION:
raise LvsModelError(f'Unrecognized LVS model version {self.model.version}')
if (
self.model.version is None
or not bny.MIN_SUPPORTED_VERSION <= self.model.version <= bny.VERSION
):
raise LvsModelError(f"Unsupported LVS model version {self.model.version}")
self._model_fns = set()
self._trust_roots = set()
in_deg_nodes = set()
Expand All @@ -81,39 +87,50 @@ def _sanity_check(self):

def dfs(cur, par):
if cur >= len(self.model.nodes):
raise LvsModelError(f'Non-existing node id {cur}')
raise LvsModelError(f"Non-existing node id {cur}")
node = self.model.nodes[cur]
if node.id != cur:
raise LvsModelError(f'Malformed node id {cur}')
raise LvsModelError(f"Malformed node id {cur}")
if par and node.parent != par:
raise LvsModelError(f'Node {cur} has a wrong parent')
raise LvsModelError(f"Node {cur} has a wrong parent")
for ve in node.v_edges:
if ve.dest is None or not ve.value:
raise LvsModelError(f'Node {cur} has a malformed edge')
raise LvsModelError(f"Node {cur} has a malformed edge")
dfs(ve.dest, cur)
for pe in node.p_edges:
if pe.dest is None or pe.tag is None:
raise LvsModelError(f'Node {cur} has a malformed edge')
raise LvsModelError(f"Node {cur} has a malformed edge")
dfs(pe.dest, cur)
for cons in pe.cons_sets:
for op in cons.options:
branch = [not not op.value, op.tag is not None, op.fn is not None].count(True)
branch = [
not not op.value,
op.tag is not None,
op.fn is not None,
].count(True)
if branch != 1:
raise LvsModelError(f'Edge {cur}->{pe.dest} has a malformed condition')
raise LvsModelError(
f"Edge {cur}->{pe.dest} has a malformed condition"
)
if op.fn is not None:
if not op.fn.fn_id:
raise LvsModelError(f'Edge {cur}->{pe.dest} has a malformed condition')
raise LvsModelError(
f"Edge {cur}->{pe.dest} has a malformed condition"
)
self._model_fns.add(op.fn.fn_id)
for key_node_id in node.sign_cons:
if key_node_id >= len(self.model.nodes):
raise LvsModelError(f'Node {cur} is signed by a non-existing key {key_node_id}')
raise LvsModelError(
f"Node {cur} is signed by a non-existing key {key_node_id}"
)
in_deg_nodes.add(key_node_id)
adj_lst[cur].append(key_node_id)

dfs(self.model.start_id, None)
top_order(nodes_id_lst, adj_lst)
self._trust_roots = {n for n in in_deg_nodes
if not self.model.nodes[n].sign_cons}
self._trust_roots = {
n for n in in_deg_nodes if not self.model.nodes[n].sign_cons
}

def validate_user_fns(self) -> bool:
"""Check if all user functions required by the model is defined."""
Expand All @@ -131,7 +148,7 @@ def root_of_trust(self) -> set[str]:
if node.rule_name:
ret = ret | set(node.rule_name)
else:
ret = ret | {'#_' + str(cur)}
ret = ret | {"#_" + str(cur)}
return ret

def save(self) -> bytes:
Expand All @@ -152,12 +169,22 @@ def load(binary_model: BinaryStr, user_fns: dict[str, UserFn]):
return Checker(model, user_fns)

def _context_to_name(self, context: dict[int, BinaryStr]) -> dict[str, BinaryStr]:
named_tag = {self._symbols[tag]: val for tag, val in context.items() if tag in self._symbols}
annon_tag = {str(tag): val for tag, val in context.items() if tag not in self._symbols}
named_tag = {
self._symbols[tag]: val
for tag, val in context.items()
if tag in self._symbols
}
annon_tag = {
str(tag): val for tag, val in context.items() if tag not in self._symbols
}
return named_tag | annon_tag

def _check_cons(self, value: BinaryStr, context: dict[int, BinaryStr],
cons_set: list[bny.PatternConstraint]) -> bool:
def _check_cons(
self,
value: BinaryStr,
context: dict[int, BinaryStr],
cons_set: list[bny.PatternConstraint],
) -> bool:
for cons in cons_set:
satisfied = False
for op in cons.options:
Expand All @@ -172,7 +199,7 @@ def _check_cons(self, value: BinaryStr, context: dict[int, BinaryStr],
else:
fn_id = op.fn.fn_id
if fn_id not in self.user_fns:
raise LvsModelError(f'User function {fn_id} is undefined')
raise LvsModelError(f"User function {fn_id} is undefined")
args = [context.get(arg.tag, arg.value) for arg in op.fn.args]
if self.user_fns[fn_id](value, args):
satisfied = True
Expand All @@ -181,7 +208,9 @@ def _check_cons(self, value: BinaryStr, context: dict[int, BinaryStr],
return False
return True

def _match(self, name: FormalName, context: dict[int, BinaryStr]) -> Iterator[tuple[int, dict[int, BinaryStr]]]:
def _match(
self, name: FormalName, context: dict[int, BinaryStr]
) -> Iterator[tuple[int, dict[int, BinaryStr]]]:
cur = self.model.start_id
edge_index = -1
edge_indices = []
Expand Down Expand Up @@ -239,7 +268,9 @@ def _match(self, name: FormalName, context: dict[int, BinaryStr]) -> Iterator[tu
del context[last_tag]
cur = node.parent

def match(self, name: NonStrictName) -> Iterator[tuple[list[str], dict[str, BinaryStr]]]:
def match(
self, name: NonStrictName
) -> Iterator[tuple[list[str], dict[str, BinaryStr]]]:
"""
Iterate all matches of a given name.
Expand All @@ -257,7 +288,7 @@ def match(self, name: NonStrictName) -> Iterator[tuple[list[str], dict[str, Bina
if node.rule_name:
rule_name = node.rule_name
else:
rule_name = ['#_' + str(node_id)]
rule_name = ["#_" + str(node_id)]
yield rule_name, self._context_to_name(context)

def check(self, pkt_name: NonStrictName, key_name: NonStrictName) -> bool:
Expand Down Expand Up @@ -302,14 +333,19 @@ def suggest(self, pkt_name: NonStrictName, keychain: Keychain) -> FormalName:
if self.check(pkt_name, cert_name):
cert = parse_certificate(key[cert_name].data)
# This is to avoid self-signed certificate
if (not cert.signature_info or not cert.signature_info.key_locator
or not cert.signature_info.key_locator.name):
if (
not cert.signature_info
or not cert.signature_info.key_locator
or not cert.signature_info.key_locator.name
):
continue
if self.check(cert_name, cert.signature_info.key_locator.name):
return cert_name


DEFAULT_USER_FNS = {
'$eq': lambda c, args: all(x == c for x in args),
'$eq_type': lambda c, args: all(Component.get_type(x) == Component.get_type(c) for x in args),
"$eq": lambda c, args: all(x == c for x in args),
"$eq_type": lambda c, args: all(
Component.get_type(x) == Component.get_type(c) for x in args
),
}

0 comments on commit 177844e

Please sign in to comment.