Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/tests-qs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: quantumstrand tests

on:
push:
branches: [ quantumstrand ]
pull_request:
branches: [ quantumstrand ]

jobs:
qs_tests:
name: quantumstrand tests
runs-on: ubuntu-22.04
steps:
- name: Checkout FLOSS with submodule
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
submodules: true
- name: Set up Python 3.11
uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: '3.11'
- name: Install FLOSS
run: |
pip install -r requirements.txt
pip install -e .[dev,qs]
- name: Run tests
run: pytest -k qs
184 changes: 150 additions & 34 deletions floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,90 @@ def check_is_xor(xor_key: int | None):
return ()


def check_is_reloc(reloc_offsets: Set[int], string: ExtractedString):
for addr in string.slice.range:
if addr in reloc_offsets:
return ("#reloc",)
class OffsetRanges:
def __init__(self, offsets: Optional[Set[int]] = None, *, _merged_ranges: Optional[List[Tuple[int, int]]] = None):
if _merged_ranges is not None:
self._ranges = _merged_ranges
return

if not offsets:
self._ranges: List[Tuple[int, int]] = []
return

sorted_offsets = sorted(list(offsets))

ranges: List[Tuple[int, int]] = []
start = sorted_offsets[0]
end = start
for offset in sorted_offsets[1:]:
if offset == end + 1:
end = offset
else:
ranges.append((start, end))
start = offset
end = offset
ranges.append((start, end))
self._ranges = ranges

def __contains__(self, offset: int) -> bool:
if not self._ranges:
return False

# Find the index where the offset would be inserted to maintain order.
index = bisect.bisect_left(self._ranges, (offset, 0))

# Check the range at the insertion index.
# This handles cases where the offset is the start of a range.
if index < len(self._ranges):
start, end = self._ranges[index]
if start == offset:
return True

# Check the range just before the insertion index.
# This handles cases where the offset is within or at the end of a range.
if index > 0:
start, end = self._ranges[index - 1]
if start <= offset <= end:
return True

return False

def overlaps(self, start: int, end: int) -> bool:
if not self._ranges:
return False

# Find the index where the start of the given range would be inserted
index = bisect.bisect_right(self._ranges, (start, 0))

# Check the range at index-1 for overlap
if index > 0:
prev_start, prev_end = self._ranges[index - 1]
if max(start, prev_start) <= min(end, prev_end):
return True

# Check the range at index for overlap
if index < len(self._ranges):
next_start, next_end = self._ranges[index]
if max(start, next_start) <= min(end, next_end):
return True

return False

@classmethod
def from_merged_ranges(cls, merged_ranges: List[Tuple[int, int]]):
return cls(_merged_ranges=merged_ranges)


def check_is_reloc(reloc_offsets: OffsetRanges, string: ExtractedString):
if reloc_offsets.overlaps(string.slice.range.offset, string.slice.range.end - 1):
return ("#reloc",)

return ()


def check_is_code(code_offsets: Set[int], string: ExtractedString):
for addr in string.slice.range:
if addr in code_offsets:
return ("#code",)
def check_is_code(code_offsets: OffsetRanges, string: ExtractedString):
if code_offsets.overlaps(string.slice.range.offset, string.slice.range.end - 1):
return ("#code",)

return ()

Expand Down Expand Up @@ -803,14 +875,16 @@ class SegmentLayout(Layout):


class PELayout(Layout):
model_config = ConfigDict(arbitrary_types_allowed=True)

# xor key if the file was xor decoded
xor_key: Optional[int]

# file offsets of bytes that are part of the relocation table
reloc_offsets: Set[int]
reloc_offsets: OffsetRanges

# file offsets of bytes that are recognized as code
code_offsets: Set[int]
code_offsets: OffsetRanges

structures_by_address: Dict[int, Structure]

Expand Down Expand Up @@ -848,6 +922,68 @@ class ResourceLayout(Layout):
pass


def _merge_overlapping_ranges(ranges: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
"""
Merge a list of (start, end) tuples into a list of contiguous ranges.
"""
if not ranges:
return []

sorted_ranges = sorted(ranges)
merged_ranges: List[Tuple[int, int]] = []
for higher in sorted_ranges:
if not merged_ranges:
merged_ranges.append(higher)
else:
lower = merged_ranges[-1]
lower_start, lower_end = lower
higher_start, higher_end = higher

# test for intersection between lower and higher:
# we know via sorting that lower_start <= higher_start
if higher_start <= lower_end + 1:
upper_bound = max(lower_end, higher_end)
merged_ranges[-1] = (lower_start, upper_bound)
else:
merged_ranges.append(higher)
return merged_ranges


def _get_code_ranges(ws: lancelot.Workspace, pe: pefile.PE, slice_: Slice) -> List[Tuple[int, int]]:
"""
Extract and return the raw, unmerged code ranges from a PE file.
"""
base_address = ws.base_address

# cache because getting the offset is slow
@functools.lru_cache(maxsize=None)
def get_offset_from_rva_cached(rva):
try:
return pe.get_offset_from_rva(rva)
except pefile.PEFormatError as e:
logger.warning("%s", str(e))
return None

code_ranges: List[Tuple[int, int]] = []
for function in ws.get_functions():
cfg = ws.build_cfg(function)
for bb in cfg.basic_blocks.values():
va = bb.address
rva = va - base_address
offset = get_offset_from_rva_cached(rva)
if offset is None:
continue

size = bb.length

if not slice_.contains_range(offset, size):
logger.warning("lancelot identified code at an invalid location, skipping basic block at 0x%x", rva)
continue

code_ranges.append((offset, offset + size - 1))
return code_ranges


def compute_pe_layout(slice: Slice, xor_key: int | None) -> Layout:
data = slice.data

Expand All @@ -857,7 +993,7 @@ def compute_pe_layout(slice: Slice, xor_key: int | None) -> Layout:
raise ValueError("pefile failed to load workspace") from e

structures = collect_pe_structures(slice, pe)
reloc_offsets = get_reloc_offsets(slice, pe)
reloc_offsets = OffsetRanges(get_reloc_offsets(slice, pe))

structures_by_address = {}
for structure in structures:
Expand All @@ -872,30 +1008,10 @@ def compute_pe_layout(slice: Slice, xor_key: int | None) -> Layout:
raise ValueError("lancelot failed to load workspace") from e

# contains the file offsets of bytes that are part of recognized instructions.
code_offsets = set()
with timing("lancelot: find code"):
base_address = ws.base_address
for function in ws.get_functions():
cfg = ws.build_cfg(function)
for bb in cfg.basic_blocks.values():
va = bb.address
rva = va - base_address
try:
offset = pe.get_offset_from_rva(rva)
except pefile.PEFormatError as e:
logger.warning("%s", str(e))
continue

size = bb.length

if not slice.contains_range(offset, size):
logger.warning(
"lancelot identified code at an invalid location, skipping basic block at 0x%x", rva
)
continue

for fo in slice.range.slice(offset, size):
code_offsets.add(fo)
code_ranges = _get_code_ranges(ws, pe, slice)
merged_code_ranges = _merge_overlapping_ranges(code_ranges)
code_offsets = OffsetRanges.from_merged_ranges(merged_code_ranges)

layout = PELayout(
slice=slice,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_qs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

@pytest.fixture
def pma_binary_path():
return CD / "data" / "pma" / "pma0303.exe_"
return CD / "data" / "pma" / "Practical Malware Analysis Lab 03-03.exe_"


@pytest.fixture
Expand Down
Loading