Skip to content

Commit

Permalink
Allowing network definition in many methods. BUT, I think it is not t…
Browse files Browse the repository at this point in the history
…he way to go
  • Loading branch information
raulikak committed May 27, 2024
1 parent 535839d commit ae51dde
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 32 deletions.
38 changes: 21 additions & 17 deletions tcsfw/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class Network:
def __init__(self, name: str) -> None:
self.name = name

def resolve_by_ip(self, _address: Union[IPv4Address, IPv6Address]) -> 'Network':
"""Resolve sub-network by IP-address"""
return Networks.Internet # FIXME: Not really resolved now

def __eq__(self, other) -> bool:
return isinstance(other, Network) and self.name == other.name

Expand Down Expand Up @@ -237,26 +241,26 @@ def get_tag(cls, addresses: Iterable[AnyAddress]) -> Optional[EntityTag]:
return None

@classmethod
def parse_address(cls, address: str) -> AnyAddress:
def parse_address(cls, address: str, network=Networks.Default) -> AnyAddress:
"""Parse any address type from string, type given as 'type|address'"""
v, _, t = address.rpartition("|")
if v == "":
t, v = "ip", t # default is IP
if t == "ip":
return IPAddress.new(v)
return IPAddress.new(v, network)
if t == "hw":
return HWAddress.new(v)
return HWAddress.new(v, network)
if t == "tag":
return EntityTag(v)
return EntityTag(v, network)
if t == "name":
return DNSName(v)
return DNSName(v, network)
raise ValueError(f"Unknown address type '{t}', allowed are 'ip', 'hw', and 'name'")

@classmethod
def parse_endpoint(cls, value: str) -> AnyAddress:
def parse_endpoint(cls, value: str, network=Networks.Default) -> AnyAddress:
"""Parse address or endpoint"""
a, _, p = value.partition("/")
addr = cls.parse_address(a)
addr = cls.parse_address(a, network)
if p == "":
return addr
prot, _, port = p.partition(":")
Expand All @@ -273,15 +277,15 @@ def __init__(self, data: str, network=Networks.Default):
assert len(self.data) == 17, f"Expecting HW address syntax dd:dd:dd:dd:dd:dd, got {data}"

@classmethod
def new(cls, data: str) -> 'HWAddress':
def new(cls, data: str, network=Networks.Default) -> 'HWAddress':
"""New address, check something about the format"""
p = list(data.split(":"))
if len(p) != 6:
raise ValueError(f"Bad HW address '{data}'")
for i in range(6):
if len(p[i]) != 2:
p[i] = f"0{p[i]}" # zero-prefix
return HWAddress(":".join(p))
return HWAddress(":".join(p), network)

@classmethod
def from_ip(cls, address: 'IPAddress') -> 'HWAddress':
Expand Down Expand Up @@ -330,24 +334,24 @@ class HWAddresses:
class IPAddress(AnyAddress):
"""IP address, either IPv4 or IPv6"""
def __init__(self, data: Union[IPv4Address, IPv6Address], network=Networks.Internet):
AnyAddress.__init__(self, network)
AnyAddress.__init__(self, network.resolve_by_ip(data))
self.data = data

def get_ip_address(self) -> Optional['IPAddress']:
return self

@classmethod
def new(cls, address: str) -> 'IPAddress':
def new(cls, address: str, network=Networks.Internet) -> 'IPAddress':
"""Create new IP address"""
if address.startswith("[") and address.endswith("]"):
address = address[1:-1] # IPv6 address in brackets
return IPAddress(ipaddress.ip_address(address))
return IPAddress(ipaddress.ip_address(address), network)

@classmethod
def parse_with_port(cls, address: str, default_port=0) -> Tuple['IPAddress', int]:
def parse_with_port(cls, address: str, default_port=0, network=Networks.Internet) -> Tuple['IPAddress', int]:
"""Parse IPv4 address, possibly with port"""
ad, _, p = address.partition(":")
return cls.new(ad), default_port if p == "" else int(p)
return cls.new(ad, network), default_port if p == "" else int(p)

def is_null(self) -> bool:
return self.data == IPAddresses.NULL.data
Expand Down Expand Up @@ -417,12 +421,12 @@ def __repr__(self):
return self.name

@classmethod
def name_or_ip(cls, value: str) -> Union[IPAddress, 'DNSName']:
def name_or_ip(cls, value: str, network: Network) -> Union[IPAddress, 'DNSName']:
"""Get value as DNS name or IP address"""
try:
return IPAddress.new(value)
return IPAddress.new(value, network)
except ValueError:
return DNSName(value)
return DNSName(value, network)

@classmethod
def looks_like(cls, name: str) -> bool:
Expand Down
7 changes: 4 additions & 3 deletions tcsfw/address_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
from typing import List
from tcsfw.address import DNSName
from tcsfw.address import DNSName, Network
from tcsfw.command_basics import read_env_file
from tcsfw.entity import SafeNameMap
from tcsfw.main import ConfigurationException
Expand All @@ -11,7 +11,8 @@

class AddressResolver:
"""Address resolver"""
def __init__(self):
def __init__(self, network: Network):
self.network = network
self.safe_names = SafeNameMap(prefix="SUT_")
self.addresses_for: List[Addressable] = []

Expand All @@ -23,5 +24,5 @@ def require(self):
value = os.environ.get(env_name) or env.get(env_name)
if not value:
raise ConfigurationException(f"Environment variable {env_name} not defined for {nb.entity.long_name()}")
address = DNSName.name_or_ip(value)
address = DNSName.name_or_ip(value, network=self.network)
nb.entity.addresses.add(address)
2 changes: 1 addition & 1 deletion tcsfw/batch_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def parse_from_json(cls, json_data: Dict, directory_name: str, system: IoTSystem

# read batch-specific addresses
for add, ent in json_data.get("addresses", {}).items():
address = Addresses.parse_address(add)
address = Addresses.parse_address(add, network=system.network)
entity = system.get_entity(ent)
if not entity:
raise ValueError(f"Unknown entity {ent}")
Expand Down
2 changes: 1 addition & 1 deletion tcsfw/builder_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ def run(self):
for set_ip in args.set_ip or []:
name, _, ips = set_ip.partition("=")
h = self.system.get_entity(name) or self.system.get_endpoint(
DNSName.name_or_ip(name))
DNSName.name_or_ip(name, network=self.system.network))
if not isinstance(h, Host) or not h.is_relevant():
raise ValueError(f"No such host '{name}'")
for ip in ips.split(","):
Expand Down
3 changes: 2 additions & 1 deletion tcsfw/har_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def decode(s: str) -> str:
if red_url.startswith("https:") and req_url.startswith("http:"):
# redirection to HTTPS, someone may be interested
ru = urllib.parse.urlparse(req_url)
ep = EndpointAddress(DNSName.name_or_ip(str(ru.hostname)), Protocol.TCP, ru.port or 80)
ep = EndpointAddress(
DNSName.name_or_ip(str(ru.hostname), network=self.system.network), Protocol.TCP, ru.port or 80)
txt = f"{response.get('status', '?')} {response.get('statusText', '?')}"
ev = PropertyAddressEvent(evidence, ep, Properties.HTTP_REDIRECT.verdict(Verdict.PASS, txt))
interface.property_address_update(ev)
Expand Down
7 changes: 5 additions & 2 deletions tcsfw/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import List, Set, Optional, Tuple, TypeVar, Callable, Dict, Any, Self, Iterable, Iterator, Union
from urllib.parse import urlparse

from tcsfw.address import AnyAddress, Addresses, EndpointAddress, EntityTag, Protocol, IPAddress, HWAddress, DNSName
from tcsfw.address import AnyAddress, Addresses, EndpointAddress, EntityTag, Networks, Protocol, IPAddress, HWAddress, DNSName
from tcsfw.basics import ConnectionType, ExternalActivity, HostType, Status
from tcsfw.entity import Entity
from tcsfw.property import PropertyKey
Expand Down Expand Up @@ -442,8 +442,11 @@ def __init__(self, name="IoT system"):
super().__init__(name)
self.concept_name = "system"
self.status = Status.EXPECTED
# network mask(s)

# network mask(s) - FIXME: moving to self.network
self.ip_networks = [ipaddress.ip_network("192.168.0.0/16")] # reasonable default
self.network = Networks.Default

# online resources
self.online_resources: Dict[str, str] = {}
# original entities and connections
Expand Down
2 changes: 1 addition & 1 deletion tcsfw/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ def __init__(self, system: IoTSystem):
def process_file(self, data: BytesIO, file_name: str, interface: EventInterface, source: EvidenceSource) -> bool:
raw_json = json.load(data)
for raw_flow in raw_json.get("flows", []):
flow = IPFlow.parse_from_json(raw_flow)
flow = IPFlow.parse_from_json(raw_flow, network=self.system.network)
flow.evidence = Evidence(source)
interface.connection(flow)
12 changes: 6 additions & 6 deletions tcsfw/traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
from typing import Any, Callable, Tuple, Set, Optional, Self, Dict

from tcsfw.address import HWAddress, IPAddress, HWAddresses, IPAddresses, Protocol, EndpointAddress, AnyAddress, \
from tcsfw.address import HWAddress, IPAddress, HWAddresses, IPAddresses, Network, Protocol, EndpointAddress, AnyAddress, \
Addresses
from tcsfw.property import PropertyKey

Expand Down Expand Up @@ -408,7 +408,7 @@ def __eq__(self, other):
return self.source == other.source and self.target == other.target and super().__eq__(other)

@classmethod
def parse_from_json(cls, value: Dict) -> 'IPFlow':
def parse_from_json(cls, value: Dict, network: Network) -> 'IPFlow':
"""Parse event from a string"""
# Form 1
protocol = "udp" if "udp" in value else "tcp" if "tcp" in value else None
Expand All @@ -420,10 +420,10 @@ def parse_from_json(cls, value: Dict) -> 'IPFlow':
(HWAddress.new(t_hw), IPAddress.new(t_ip), t_port), protocol=Protocol.get_protocol(protocol))
# Form 2
protocol = Protocol.get_protocol(value["protocol"])
s_ip, s_port = IPAddress.parse_with_port(value["source"])
t_ip, t_port = IPAddress.parse_with_port(value["target"])
s_hw = HWAddress.new(value["source_hw"]) if "source_hw" in value else HWAddress.from_ip(s_ip)
t_hw = HWAddress.new(value["target_hw"]) if "target_hw" in value else HWAddress.from_ip(t_ip)
s_ip, s_port = IPAddress.parse_with_port(value["source"], network)
t_ip, t_port = IPAddress.parse_with_port(value["target"], network)
s_hw = HWAddress.new(value["source_hw"], network) if "source_hw" in value else HWAddress.from_ip(s_ip, network)
t_hw = HWAddress.new(value["target_hw"], network) if "target_hw" in value else HWAddress.from_ip(t_ip, network)
return IPFlow(NO_EVIDENCE, (s_hw, s_ip, s_port), (t_hw, t_ip, t_port), protocol=protocol)


Expand Down

0 comments on commit ae51dde

Please sign in to comment.