diff --git a/tcsfw/shell_commands.py b/tcsfw/shell_commands.py index def6430..4f45496 100644 --- a/tcsfw/shell_commands.py +++ b/tcsfw/shell_commands.py @@ -2,7 +2,7 @@ from io import BytesIO, TextIOWrapper import re -from typing import Dict, Tuple +from typing import Any, Dict, List, Set, Tuple from tcsfw.address import Addresses, AnyAddress, EndpointAddress, HWAddresses, IPAddress from tcsfw.components import OperatingSystem from tcsfw.event_interface import EventInterface, PropertyEvent @@ -23,7 +23,6 @@ def process_endpoint(self, endpoint: AnyAddress, stream: BytesIO, interface: Eve node = self.system.get_endpoint(endpoint) columns: Dict[str, int] = {} - unexpected = {} os = OperatingSystem.get_os(node, add=self.load_baseline) # expected processes as regexps @@ -31,6 +30,8 @@ def process_endpoint(self, endpoint: AnyAddress, stream: BytesIO, interface: Eve for user, ps_list in os.process_map.items(): regexp_map[user] = [re.compile(ps) for ps in ps_list] + properties: Dict[str, List[Tuple[PropertyKey, Any]]] = {} + unseen: Dict[str, Set[str]] = {u: set(ps) for u, ps in os.process_map.items()} with TextIOWrapper(stream) as f: while True: line = f.readline().split(maxsplit=len(columns) -1 if columns else -1) @@ -44,6 +45,7 @@ def process_endpoint(self, endpoint: AnyAddress, stream: BytesIO, interface: Eve continue # bad line user = line[columns["US"]].strip() cmd = line[columns["CM"]].strip() + pid = line[columns["PI"]].strip() # using pid as unique identifier if cmd.startswith("[") and cmd.endswith("]"): continue # kernel thread cmd_0 = cmd.split()[0] @@ -59,30 +61,38 @@ def process_endpoint(self, endpoint: AnyAddress, stream: BytesIO, interface: Eve if exp_ps is None: self.logger.debug("User %s not in process map", user) continue + key = PropertyKey("process", user, str(pid)) + exp = f"{user} process: {cmd}" + user_ps = properties.setdefault(user, []) for ps in exp_ps: if ps.match(cmd): + user_ps.append(key.verdict(Verdict.PASS, explanation=exp)) + unseen[user].discard(ps.pattern) break else: self.logger.debug("Command %s not expected process for %s", cmd, user) - unexpected.setdefault(user, []).append(cmd) + user_ps.append(key.verdict(Verdict.FAIL, explanation=exp)) continue self.logger.debug("Command %s expected process for %s", cmd, user) if self.send_events: - # send pass or fail verdicts + # send pass or fail properties per process and set-value per user evidence = Evidence(source) - all_procs = set(os.process_map.keys()) - all_procs.update(unexpected.keys()) - for user in sorted(all_procs): - key = PropertyKey("process", user) - if user in unexpected: - ver = Verdict.FAIL - exp = f"Unexpected {user} processes: " + ", ".join(unexpected[user]) - else: - ver = Verdict.PASS - exp = "" - ev = PropertyEvent(evidence, os, key.verdict(ver, explanation=exp)) + for user, ps in sorted(properties.items()): + # seen processes, expected or not + for prop in ps: + ev = PropertyEvent(evidence, os, prop) + interface.property_update(ev) + set_p = PropertyKey("process", user).value_set(set(p[0] for p in ps)) + ev = PropertyEvent(evidence, os, set_p) interface.property_update(ev) + # unseen processes + unseen_ps = unseen.get(user, set()) + for n, ps_name in enumerate(sorted(unseen_ps)): + key = PropertyKey("process", user, f"-{n}") + ev = PropertyEvent( + evidence, os, key.verdict(Verdict.INCON, explanation=f"{user} process {ps_name} not seen")) + interface.property_update(ev) class ShellCommandSs(EndpointTool): diff --git a/tests/test_shell_ps_command.py b/tests/test_shell_ps_command.py index 759dd9b..80cf7f1 100644 --- a/tests/test_shell_ps_command.py +++ b/tests/test_shell_ps_command.py @@ -35,8 +35,7 @@ def test_shell_ps_pass(): }) BatchImporter(su.get_inspector()).import_batch(pathlib.Path("tests/samples/shell-ps")) os = OperatingSystem.get_os(su.device1.entity, add=False) - prop = os.properties.get(PropertyKey("process", "root")) - assert prop.verdict == Verdict.PASS + assert PropertyKey("process", "root").get_verdict(os.properties) == Verdict.PASS def test_shell_ps_fail(): @@ -46,5 +45,4 @@ def test_shell_ps_fail(): }) BatchImporter(su.get_inspector()).import_batch(pathlib.Path("tests/samples/shell-ps")) os = OperatingSystem.get_os(su.device1.entity, add=False) - prop = os.properties.get(PropertyKey("process", "root")) - assert prop.verdict == Verdict.FAIL + assert PropertyKey("process", "root").get_verdict(os.properties) == Verdict.FAIL