Skip to content

Commit

Permalink
Shell command adaptor ss implementation started
Browse files Browse the repository at this point in the history
  • Loading branch information
raulikak committed May 20, 2024
1 parent 992eacd commit ab4c092
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 3 deletions.
68 changes: 66 additions & 2 deletions tcsfw/shell_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from io import BytesIO, TextIOWrapper
import re
from typing import Dict
from typing import Dict, Tuple
from tcsfw.address import Addresses, EndpointAddress, IPAddress
from tcsfw.components import OperatingSystem
from tcsfw.event_interface import EventInterface, PropertyEvent
from tcsfw.model import IoTSystem, NetworkNode
from tcsfw.property import PropertyKey
from tcsfw.tools import NetworkNodeTool
from tcsfw.traffic import Evidence, EvidenceSource
from tcsfw.traffic import Evidence, EvidenceSource, Protocol
from tcsfw.verdict import Verdict


Expand Down Expand Up @@ -79,3 +80,66 @@ def process_node(self, node: NetworkNode, data_file: BytesIO, interface: EventIn
exp = ""
ev = PropertyEvent(evidence, os, key.verdict(ver, explanation=exp))
interface.property_update(ev)


class ShellCommandSs(NetworkNodeTool):
"""Shell command 'ss' tool adapter"""
def __init__(self, system: IoTSystem):
super().__init__("shell-ss", ".txt", system)

def _parse_address(self, addr: str) -> Tuple[str, str, int]:
"""Parse address into IP, interface, port"""
ad_inf, _, port = addr.rpartition(":")
ad, _, inf = ad_inf.partition("%")
return ad if ad not in {"", "*", "0.0.0.0"} else "", inf, int(port) if port not in {"", "*"} else -1

LOCAL_ADDRESS = "Local_Address"
PEER_ADDRESS = "Peer_Address"

def process_node(self, node: NetworkNode, data_file: BytesIO, interface: EventInterface, source: EvidenceSource):
columns: Dict[str, int] = {}
services = set()
conns = set()

with TextIOWrapper(data_file) as f:
while True:
line = f.readline()
if not line:
break
if not columns:
# header line, use first two characters (headers are truncated for narrow data)
line = line.replace("Local Address:Port", self.LOCAL_ADDRESS)
line = line.replace("Peer Address:Port", self.PEER_ADDRESS)
columns = {name: idx for idx, name in enumerate(line.split())}
assert self.LOCAL_ADDRESS in columns, "Local address not found"
assert self.PEER_ADDRESS in columns, "Peer address not found"
continue
cols = line.split()
if len(cols) <= columns[self.PEER_ADDRESS]:
continue # bad line
net_id = cols[columns["Netid"]]
state = cols[columns["State"]]
local_ip, local_inf, local_port = self._parse_address(cols[columns[self.LOCAL_ADDRESS]])
peer_ip, _, peer_port = self._parse_address(cols[columns[self.PEER_ADDRESS]])
self.logger.debug("Local %s:%d Peer %s:%d", local_ip, local_port, peer_ip, peer_port)
local_add = IPAddress.new(local_ip) if local_ip else None
peer_add = IPAddress.new(peer_ip) if peer_ip else None
if local_inf == "lo" or (local_add and local_add.is_loopback()):
continue # loopback is not external
if net_id == "udp" and state == "UNCONN":
# listening UDP port
add = EndpointAddress(local_add or Addresses.ANY, Protocol.UDP, local_port)
services.add(add)
continue
if net_id == "tcp" and state == "LISTEN":
# listening TCP port
add = EndpointAddress(local_add or Addresses.ANY, Protocol.TCP, local_port)
services.add(add)
continue
if net_id in {"udp", "tcp"} and state != "LISTEN" and local_add and peer_add:
# UDP or TCP connection
proto = Protocol.UDP if net_id == "udp" else Protocol.TCP
local = EndpointAddress(local_add, proto, local_port)
peer = EndpointAddress(peer_add, proto, peer_port)
conns.add((local, peer))
continue
3 changes: 2 additions & 1 deletion tcsfw/tool_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tcsfw.pcap_reader import PCAPReader
from tcsfw.ping_command import PingCommand
from tcsfw.releases import ReleaseReader
from tcsfw.shell_commands import ShellCommandPs
from tcsfw.shell_commands import ShellCommandPs, ShellCommandSs
from tcsfw.spdx_reader import SPDXReader
from tcsfw.ssh_audit_scan import SSHAuditScan
from tcsfw.testsslsh_scan import TestSSLScan
Expand Down Expand Up @@ -81,6 +81,7 @@ def __init__(self):
self.pcap = ToolDepiction("capture-json", TSharkReader, extension="json")
self.pcap_flow = ToolDepiction("pcap-flow", SimpleFlowTool, extension="json")
self.shell_ps = ToolDepiction("shell-ps", ShellCommandPs)
self.shell_ss = ToolDepiction("shell-ss", ShellCommandSs)
self.sdpx = ToolDepiction("spdx", SPDXReader)
self.ssh_audit = ToolDepiction("ssh-audit", SSHAuditScan)
self.testssl = ToolDepiction("testssl", TestSSLScan)
Expand Down
3 changes: 3 additions & 0 deletions tests/samples/shell-ss/00meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"file_type": "shell-ss"
}
19 changes: 19 additions & 0 deletions tests/samples/shell-ss/Device.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
udp UNCONN 0 0 127.0.0.53%lo:53 0.0.0.0:*
udp UNCONN 0 0 65.21.253.97%eth0:68 0.0.0.0:*
udp UNCONN 0 0 0.0.0.0:123 0.0.0.0:*
udp UNCONN 0 0 127.0.0.1:323 0.0.0.0:*
udp UNCONN 0 0 *:1194 *:*
udp UNCONN 0 0 [::1]:323 [::]:*
tcp LISTEN 0 4096 127.0.0.53%lo:53 0.0.0.0:*
tcp LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
tcp LISTEN 0 511 0.0.0.0:41337 0.0.0.0:*
tcp LISTEN 0 511 0.0.0.0:443 0.0.0.0:*
tcp LISTEN 0 128 169.254.255.255:51337 0.0.0.0:*
tcp LISTEN 0 4096 127.0.0.1:27017 0.0.0.0:*
tcp LISTEN 0 511 0.0.0.0:80 0.0.0.0:*
tcp ESTAB 0 0 169.254.255.255:51337 169.254.10.1:39236
tcp ESTAB 0 0 169.254.255.255:51337 169.254.10.2:56164
tcp SYN-RECV 0 0 65.21.253.97:80 104.223.42.139:10446
tcp ESTAB 0 192 65.21.253.97:22 81.175.152.166:49224
tcp LISTEN 0 128 [::]:22 [::]:*
21 changes: 21 additions & 0 deletions tests/test_shell_ss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Test shell command output parsing"""

import pathlib

from tcsfw.batch_import import BatchImporter
from tcsfw.components import OperatingSystem
from tcsfw.property import PropertyKey
from tcsfw.verdict import Verdict
from tests.test_model import Setup


class Setup_1(Setup):
"""Setup for tests here"""
def __init__(self):
super().__init__()
self.device1 = self.system.device().hw("1:0:0:0:0:1")


def test_shell_ss_pass():
su = Setup_1()
BatchImporter(su.get_inspector()).import_batch(pathlib.Path("tests/samples/shell-ss"))

0 comments on commit ab4c092

Please sign in to comment.