Skip to content

Commit

Permalink
ToolFinder to adopt lookup of tools from batch import code
Browse files Browse the repository at this point in the history
  • Loading branch information
raulikak committed May 15, 2024
1 parent 415abb3 commit d4ddca4
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 121 deletions.
152 changes: 36 additions & 116 deletions tcsfw/batch_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,14 @@
import logging
import pathlib
import io
from typing import Dict, List, Optional, Type
from enum import StrEnum
from typing import Dict, List

from tcsfw.address import Addresses
from tcsfw.android_manifest_scan import AndroidManifestScan
from tcsfw.basics import ExternalActivity
from tcsfw.censys_scan import CensysScan
from tcsfw.event_interface import EventInterface
from tcsfw.har_scan import HARScan
from tcsfw.mitm_log_reader import MITMLogReader
from tcsfw.model import EvidenceNetworkSource, IoTSystem
from tcsfw.nmap_scan import NMAPScan
from tcsfw.pcap_reader import PCAPReader
from tcsfw.ping_command import PingCommand
from tcsfw.releases import ReleaseReader
from tcsfw.spdx_reader import SPDXReader
from tcsfw.ssh_audit_scan import SSHAuditScan
from tcsfw.testsslsh_scan import TestSSLScan
from tcsfw.tools import CheckTool, SimpleFlowTool
from tcsfw.tool_finder import ToolDepiction, ToolFinder
from tcsfw.traffic import EvidenceSource
from tcsfw.tshark_reader import TSharkReader
from tcsfw.vulnerability_reader import VulnerabilityReader
from tcsfw.web_checker import WebChecker

from tcsfw.zed_reader import ZEDReader


class BatchImporter:
Expand All @@ -41,18 +24,6 @@ def __init__(self, interface: EventInterface, label_filter: 'LabelFilter' = None
self.label_filter = label_filter or LabelFilter()
self.logger = logging.getLogger("batch_importer")

# map file types into batch tools
self.batch_tools: Dict[BatchFileType, Type[CheckTool]] = {
BatchFileType.APK: AndroidManifestScan,
BatchFileType.CENSYS: CensysScan,
BatchFileType.HAR: HARScan,
BatchFileType.RELEASES: ReleaseReader,
BatchFileType.SPDX: SPDXReader,
BatchFileType.SSH_AUDIT: SSHAuditScan,
BatchFileType.TESTSSL: TestSSLScan,
BatchFileType.VULNERABILITIES: VulnerabilityReader,
}

# collect evidence sources from visited tools
self.evidence: Dict[str, List[EvidenceSource]] = {}

Expand Down Expand Up @@ -83,18 +54,21 @@ def _import_batch(self, file: pathlib.Path):
else:
info = FileMetaInfo()

# get tool info by file type
tool_dep = ToolFinder.by_file_type(info.file_type)

# list files/directories to process
proc_list = []
for child in file.iterdir():
if child == meta_file:
for a_file in file.iterdir():
if a_file == meta_file:
continue
prefix = child.name[:1]
prefix = a_file.name[:1]
if prefix in {".", "_"}:
continue
postfix = child.name[-1:]
postfix = a_file.name[-1:]
if postfix in {"~"}:
continue
proc_list.append(child)
proc_list.append(a_file)

# sort files to specified order, if any
if info.file_load_order:
Expand All @@ -103,66 +77,40 @@ def _import_batch(self, file: pathlib.Path):
# filter by label
skip_processing = not self.label_filter.filter(info.label)

# process the files in a batch?
as_batch = info.file_type in self.batch_tools
if as_batch:
self._do_process_files(proc_list, info, skip_processing)
# give all files to the tool
all_files = tool_dep.filter_files_itself()
if all_files:
# process all files by one tool
self._do_process_files(proc_list, info, tool_dep, skip_processing)

if not info.label:
self.logger.info("skipping all files as no 00meta.json")

# recursively scan the directory
for child in proc_list:
if info and child.is_file():
if as_batch or not info.label:
for a_file in proc_list:
if info and a_file.is_file():
if all_files or not info.label:
continue
# process the files individually
if not info.default_include and info.label not in self.label_filter.included:
self.logger.debug("skipping (default=False) %s", child.as_posix())
self.logger.debug("skipping (default=False) %s", a_file.as_posix())
continue # skip file if not explicitly included
with child.open("rb") as f:
self._do_process(f, child, info, skip_processing)
with a_file.open("rb") as f:
self._do_process(f, a_file, info, tool_dep, skip_processing)
else:
self._import_batch(child)
self._import_batch(a_file)

def _do_process(self, stream: io.BytesIO, file_path: pathlib.Path, info: 'FileMetaInfo', skip_processing: bool):
"""Process the file as stream"""
def _do_process(self, stream: io.BytesIO, file_path: pathlib.Path, info: 'FileMetaInfo', tool: ToolDepiction,
skip_processing: bool):
"""Process a file """
if not skip_processing:
self.logger.info("processing (%s) %s", info.label, file_path.as_posix())

file_name = file_path.name
file_ext = file_path.suffix.lower()

def ext(value: str) -> bool:
"""Check extension"""
return info.from_pipe or file_ext == value
reader = tool.create_tool(self.system, "" if info.from_pipe else file_ext)

try:
reader = None
if ext(".json") and info.file_type == BatchFileType.CAPTURE:
reader = SimpleFlowTool(self.interface.get_system())
elif ext(".pcap") and info.file_type in {BatchFileType.UNSPECIFIED, BatchFileType.CAPTURE}:
# read flows from pcap
reader = PCAPReader(self.interface.get_system())
elif ext(".json") and info.file_type == BatchFileType.CAPTURE_JSON:
# read flows from JSON pcap
reader = TSharkReader(self.interface.get_system())
elif ext(".log") and info.file_type == BatchFileType.MITMPROXY:
# read MITM from textual log
reader = MITMLogReader(self.interface.get_system())
elif ext(".xml") and info.file_type == BatchFileType.NMAP:
# read NMAP from xml
reader = NMAPScan(self.interface.get_system())
elif ext(".log") and info.file_type == BatchFileType.PING:
# read Ping output
reader = PingCommand(self.interface.get_system())
elif ext(".http") and info.file_type == BatchFileType.HTTP_MESSAGE:
# read messages from http content file
reader = WebChecker(self.interface.get_system())
elif ext(".json") and info.file_type == BatchFileType.ZAP:
# read ZAP from json
reader = ZEDReader(self.interface.get_system())

if reader:
ev = info.source.rename(name=reader.tool.name, base_ref=file_path.as_posix(),
label=info.label)
Expand All @@ -180,27 +128,28 @@ def ext(value: str) -> bool:
raise ValueError(f"Error in {file_name}") from e
self.logger.info("skipping unsupported '%s' type %s", file_name, info.file_type)

def _do_process_files(self, files: List[pathlib.Path], info: 'FileMetaInfo', skip_processing: bool):
def _do_process_files(self, files: List[pathlib.Path], info: 'FileMetaInfo', tool: ToolDepiction,
skip_processing: bool):
"""Process files"""
tool = self.batch_tools[info.file_type](self.interface.get_system())
tool.load_baseline = info.load_baseline
reader = tool.create_tool(self.system)
reader.load_baseline = info.load_baseline

if skip_processing:
self.logger.info("skipping (%s) data files", info.label)
ev = info.source.rename(name=tool.tool.name)
ev = info.source.rename(name=reader.tool.name)
self.evidence.setdefault(info.label, []).append(ev)
return

unmapped = set(tool.file_name_map.keys())
unmapped = set(reader.file_name_map.keys())
for fn in files:
if not fn.is_file():
continue # directories called later
ev = info.source.rename(name=tool.tool.name, base_ref=fn.as_posix(), label=info.label)
ev = info.source.rename(name=reader.tool.name, base_ref=fn.as_posix(), label=info.label)
self.evidence.setdefault(info.label, []).append(ev)
with fn.open("rb") as f:
# tool-specific code can override, if knows better
ev.timestamp = datetime.fromtimestamp(fn.stat().st_mtime)
done = tool.process_file(f, fn.name, self.interface, ev)
done = reader.process_file(f, fn.name, self.interface, ev)
if done:
unmapped.remove(fn.name)
else:
Expand All @@ -209,38 +158,9 @@ def _do_process_files(self, files: List[pathlib.Path], info: 'FileMetaInfo', ski
self.logger.debug("no files for %s", sorted(unmapped))


class BatchFileType(StrEnum):
"""Batch file type"""
UNSPECIFIED = "unspecified"
APK = "apk"
CAPTURE = "capture"
CAPTURE_JSON = "capture-json"
CENSYS = "censys"
HAR = "har"
MITMPROXY = "mitmproxy"
NMAP = "nmap"
PING = "ping"
RELEASES = "github-releases" # Github format
SPDX = "spdx"
SSH_AUDIT = "ssh-audit"
TESTSSL = "testssl"
VULNERABILITIES = "blackduck-vulnerabilities" # BlackDuck csv output
HTTP_MESSAGE = "http"
ZAP = "zap" # ZED Attack Proxy

@classmethod
def parse(cls, value: Optional[str]):
"""Parse from string"""
if not value:
return cls.UNSPECIFIED
for t in cls:
if t.value == value:
return t
raise ValueError(f"Unknown batch file type: {value}")

class FileMetaInfo:
"""Batch file information."""
def __init__(self, label="", file_type=BatchFileType.UNSPECIFIED):
def __init__(self, label="", file_type=""):
self.label = label
self.file_load_order: List[str] = []
self.file_type = file_type
Expand All @@ -258,7 +178,7 @@ def parse_from_stream(cls, stream: io.BytesIO, directory_name: str, system: IoTS
def parse_from_json(cls, json_data: Dict, directory_name: str, system: IoTSystem) -> 'FileMetaInfo':
"""Parse from JSON"""
label = str(json_data.get("label", directory_name))
file_type = BatchFileType.parse(json_data.get("file_type")).value
file_type = json_data.get("file_type", "")
r = cls(label, file_type)
r.from_pipe = bool(json_data.get("from_pipe", False))
r.load_baseline = bool(json_data.get("load_baseline", False))
Expand Down
96 changes: 96 additions & 0 deletions tcsfw/tool_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Tool factory"""

from typing import Dict, List, Optional, Type, Union
from tcsfw.android_manifest_scan import AndroidManifestScan
from tcsfw.censys_scan import CensysScan
from tcsfw.har_scan import HARScan
from tcsfw.mitm_log_reader import MITMLogReader
from tcsfw.model import IoTSystem
from tcsfw.nmap_scan import NMAPScan
from tcsfw.pcap_reader import PCAPReader
from tcsfw.ping_command import PingCommand
from tcsfw.releases import ReleaseReader
from tcsfw.spdx_reader import SPDXReader
from tcsfw.ssh_audit_scan import SSHAuditScan
from tcsfw.testsslsh_scan import TestSSLScan
from tcsfw.tools import CheckTool, SimpleFlowTool
from tcsfw.tshark_reader import TSharkReader
from tcsfw.vulnerability_reader import VulnerabilityReader
from tcsfw.web_checker import WebChecker
from tcsfw.zed_reader import ZEDReader


class ToolDepiction:
"""Tool depiction"""
def __init__(self, file_type: Union[str|List[str]], tool_class: Union[Type[CheckTool], Dict[str, Type[CheckTool]]],
extension=""):
file_types = file_type if isinstance(file_type, list) else [file_type]
self.file_type = file_types[0] # primary
self.tools: Dict[str, Type[CheckTool]] = {}
if isinstance(tool_class, dict):
assert not extension
self.tools = tool_class
else:
assert isinstance(tool_class, type)
self.tools[extension] = tool_class
for ft in file_types:
assert ft not in self.ToolsByType, f"Two tools for file type '{ft}'"
self.ToolsByType[ft] = self

def filter_files_itself(self) -> bool:
"""Does the tool filter files itself?"""
return len(self.tools) == 1 and "" in self.tools

def create_tool(self, system: IoTSystem, file_extension="") -> Optional[CheckTool]:
"""Create tool, optionally by data file extension"""
if file_extension:
file_extension = file_extension.lower()
file_extension = file_extension[1:] if file_extension.startswith(".") else file_extension
tc = self.tools.get(file_extension)
else:
tc = next(iter(self.tools.values()), None)
if tc is None:
return None
return tc(system)

def __repr__(self) -> str:
return self.file_type

ToolsByType: Dict[str, 'ToolDepiction'] = {}


class ToolFinderImplementation:
"""Tool finder implementation"""
def __init__(self):
assert not ToolDepiction.ToolsByType, "Only one instance of ToolFinder should be created"

# NOTE: Tools without given file extension and given all files from directory.
# They are expected to only use those which make sense for them.

self.apk = ToolDepiction("apk", AndroidManifestScan, extension="xml")
self.censys = ToolDepiction("censys", CensysScan)
self.har = ToolDepiction("har", HARScan, extension="json")
self.http = ToolDepiction("http", WebChecker, extension="http")
self.mitm_proxy = ToolDepiction("mitmproxy", MITMLogReader, extension="log")
self.nmap = ToolDepiction("nmap", NMAPScan, extension="xml")
self.releases = ToolDepiction("github-releases", ReleaseReader)
self.ping = ToolDepiction("ping", PingCommand, extension="log")
self.pcap = ToolDepiction(["capture", ""], PCAPReader, extension="pcap") # Default tool - file_type ""
self.pcap = ToolDepiction("capture-json", TSharkReader, extension="json")
self.pcap_flow = ToolDepiction("pcap-flow", SimpleFlowTool, extension="json")
self.sdpx = ToolDepiction("spdx", SPDXReader)
self.ssh_audit = ToolDepiction("ssh-audit", SSHAuditScan)
self.testssl = ToolDepiction("testssl", TestSSLScan)
self.vulnerabilities = ToolDepiction("blackduck-vulnerabilities", VulnerabilityReader)
self.zap = ToolDepiction("zap", ZEDReader, extension="json")

def by_file_type(self, file_type: str) -> ToolDepiction:
"""Get tool by name"""
cl = ToolDepiction.ToolsByType.get(file_type)
if cl is None:
raise ValueError(f"Unknown file_type '{file_type}'")
return cl


# The tool finder singleton
ToolFinder = ToolFinderImplementation()
8 changes: 3 additions & 5 deletions tests/test_batch_import.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pathlib
from tcsfw.address import HWAddress, IPAddress
from tcsfw.batch_import import BatchFileType, BatchImporter, FileMetaInfo
from tcsfw.batch_import import BatchImporter, FileMetaInfo
from tcsfw.inspector import Inspector
from tcsfw.matcher import SystemMatcher
from tcsfw.model import IoTSystem
from tests.test_model import simple_setup_1


Expand All @@ -29,8 +27,8 @@ def test_parse_from_json():
result = FileMetaInfo.parse_from_json(json_data, "pcap-x", system)

assert result.label == "pcap-x"
assert result.file_type == BatchFileType.CAPTURE
assert result.default_include == True
assert result.file_type == "capture"
assert result.default_include is True
assert len(result.source.address_map) == 2
assert result.source.address_map[IPAddress.new("1.2.3.4")] == system.get_entity("Device 1")
assert result.source.address_map[HWAddress.new("1:2:3:4:5:6")] == system.get_entity("Device 2")

0 comments on commit d4ddca4

Please sign in to comment.