Skip to content

Commit

Permalink
Add support for running on Windows to the Filesystem component and fi…
Browse files Browse the repository at this point in the history
…x path handling on Windows (#521)
  • Loading branch information
alchzh authored Dec 5, 2024
1 parent 54c2956 commit 280b81c
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 105 deletions.
2 changes: 2 additions & 0 deletions ofrak_core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add recursive identify functionality in the GUI. ([#435](https://github.com/redballoonsecurity/ofrak/pull/435))
- Add generic DecompilationAnalysis classes. ([#453](https://github.com/redballoonsecurity/ofrak/pull/453))
- `PatchFromSourceModifier` bundles src and header files into same temporary directory with BOM and FEM ([#517](https://github.com/redballoonsecurity/ofrak/pull/517))
- Add support for running on Windows to the `Filesystem` component. ([#521](https://github.com/redballoonsecurity/ofrak/pull/521))

### Fixed
- Improved flushing of filesystem entries (including symbolic links and other types) to disk. ([#373](https://github.com/redballoonsecurity/ofrak/pull/373))
Expand All @@ -32,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix bug where PJSON deserializer fails to deserialze `ComponentConfig` dataclasses have a field with a default value of `None`. ([#506](https://github.com/redballoonsecurity/ofrak/pull/506))
- Fix bug where calling `Resource.remove_tag` on both a tag class and a class that inherits from that class causes a `KeyError` on resource save. ([#510](https://github.com/redballoonsecurity/ofrak/pull/510))
- Use PyPI version of `bincopy`, upgrade to version 20.0.0 ([#528](https://github.com/redballoonsecurity/ofrak/pull/528))
- Fix bugs on Windows arising from using `os.path` methods when only forward-slashes are acceptable ([#521](https://github.com/redballoonsecurity/ofrak/pull/521))

### Changed
- By default, the ofrak log is now `ofrak-YYYYMMDDhhmmss.log` rather than just `ofrak.log` and the name can be specified on the command line ([#480](https://github.com/redballoonsecurity/ofrak/pull/480))
Expand Down
8 changes: 4 additions & 4 deletions ofrak_core/ofrak/core/dtb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
For more information see: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html
"""

import os
import posixpath
import struct
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -55,7 +55,7 @@ async def get_path(self) -> str:
return self.name

parent_node = await self.resource.get_parent_as_view(v_type=DtbNode)
return os.path.join(await parent_node.get_path(), self.name)
return posixpath.join(await parent_node.get_path(), self.name)


class DeviceTreeBlob(GenericBinary):
Expand Down Expand Up @@ -183,7 +183,7 @@ def caption(cls, attributes) -> str:

async def get_path(self):
parent_node = await self.resource.get_parent_as_view(v_type=DtbNode)
return os.path.join(await parent_node.get_path(), self.name)
return posixpath.join(await parent_node.get_path(), self.name)

async def get_value(self) -> Union[str, List[str], int, List[int], bytes, bytearray, None]:
if self.p_type is DtbPropertyType.DtbPropNoValue:
Expand Down Expand Up @@ -321,7 +321,7 @@ async def pack(self, resource: Resource, config: ComponentConfig = None):
):
# By default, add_item adds the missing nodes to complete the path of a previous node
if not dtb.exist_node(await node.get_path()):
dtb.add_item(fdt.Node(node.name), os.path.dirname(await node.get_path()))
dtb.add_item(fdt.Node(node.name), posixpath.dirname(await node.get_path()))
for prop in await node.resource.get_children_as_view(
v_type=DtbProperty,
r_filter=ResourceFilter(tags=[DtbProperty]),
Expand Down
135 changes: 96 additions & 39 deletions ofrak_core/ofrak/core/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
import stat
import sys
import warnings
import tempfile
import posixpath
from pathlib import PurePath
from dataclasses import dataclass
from typing import Dict, Iterable, Optional, Type, Union

Expand All @@ -22,6 +26,14 @@
)


def _warn_chmod_chown_windows(): # pragma: no cover
# warnings.warn instead of logging.warning prevents duplicating this static warning message
warnings.warn(
"os.chown and os.chmod do not work on Windows platforms. Unix-like file ownership and "
"permissions will not be properly handled while using OFRAK on this platform."
)


@dataclass
class FilesystemEntry(ResourceView):
"""
Expand Down Expand Up @@ -108,7 +120,8 @@ async def get_path(self) -> str:
"""
Get a folder's path, with the `FilesystemRoot` as the path root.
:return: The full path name, with the `FilesystemRoot` ancestor as the path root
:return: The full path name, with the `FilesystemRoot` ancestor as the path root;
always POSIX style paths with forward slashes
"""
path = [self.get_name()]

Expand All @@ -122,9 +135,8 @@ async def get_path(self) -> str:
break
a_view = await a.view_as(FilesystemEntry)
path.append(a_view.get_name())
path.reverse()

return os.path.join(*path)
return posixpath.join(*reversed(path))

def apply_stat_attrs(self, path: str):
"""
Expand All @@ -134,8 +146,11 @@ def apply_stat_attrs(self, path: str):
:param path: Path on disk to set attributes of.
"""
if self.stat:
os.chown(path, self.stat.st_uid, self.stat.st_gid)
os.chmod(path, self.stat.st_mode)
if sys.platform == "win32":
_warn_chmod_chown_windows()
else:
os.chown(path, self.stat.st_uid, self.stat.st_gid)
os.chmod(path, self.stat.st_mode)
os.utime(path, (self.stat.st_atime, self.stat.st_mtime))
if self.xattrs:
for attr, value in self.xattrs.items():
Expand Down Expand Up @@ -173,11 +188,16 @@ async def flush_to_disk(self, root_path: str = ".", filename: Optional[str] = No
os.symlink(link_view.source_path, link_name)
assert len(list(await self.resource.get_children())) == 0
if self.stat:
# https://docs.python.org/3/library/os.html#os.supports_follow_symlinks
if os.chown in os.supports_follow_symlinks:
os.chown(link_name, self.stat.st_uid, self.stat.st_gid, follow_symlinks=False)
if os.chmod in os.supports_follow_symlinks:
os.chmod(link_name, self.stat.st_mode, follow_symlinks=False)
if sys.platform == "win32":
_warn_chmod_chown_windows()
else:
# https://docs.python.org/3/library/os.html#os.supports_follow_symlinks
if os.chown in os.supports_follow_symlinks:
os.chown(
link_name, self.stat.st_uid, self.stat.st_gid, follow_symlinks=False
)
if os.chmod in os.supports_follow_symlinks:
os.chmod(link_name, self.stat.st_mode, follow_symlinks=False)
if os.utime in os.supports_follow_symlinks:
os.utime(
link_name,
Expand All @@ -198,20 +218,41 @@ async def flush_to_disk(self, root_path: str = ".", filename: Optional[str] = No
self.apply_stat_attrs(file_name)
elif self.is_device():
device_name = os.path.join(root_path, entry_path)
if self.stat is None:
raise ValueError(
if sys.platform == "win32" or not hasattr(os, "mknod"):
warnings.warn(
f"Cannot create a device {entry_path} for a "
f"BlockDevice or CharacterDevice resource with no stat!"
f"BlockDevice or CharacterDevice resource on platform {sys.platform}! "
f"Creating an empty regular file instead."
)
os.mknod(device_name, self.stat.st_mode, self.stat.st_rdev)
with open(device_name, "w") as f:
pass
else:
if self.stat is None:
raise ValueError(
f"Cannot create a device {entry_path} for a "
f"BlockDevice or CharacterDevice resource with no stat!"
)
os.mknod(device_name, self.stat.st_mode, self.stat.st_rdev)

self.apply_stat_attrs(device_name)
elif self.is_fifo_pipe():
fifo_name = os.path.join(root_path, entry_path)
if self.stat is None:
raise ValueError(
f"Cannot create a fifo {entry_path} for a FIFOPipe resource " "with no stat!"
if sys.platform == "win32" or not hasattr(os, "mkfifo"):
warnings.warn(
f"Cannot create a fifo {entry_path} for a "
f"FIFOPipe resource on platform {sys.platform}! "
f"Creating an empty regular file instead."
)
os.mkfifo(fifo_name, self.stat.st_mode)
with open(fifo_name, "w") as f:
pass
else:
if self.stat is None:
raise ValueError(
f"Cannot create a fifo {entry_path} for a FIFOPipe resource "
"with no stat!"
)
os.mkfifo(fifo_name, self.stat.st_mode)

self.apply_stat_attrs(fifo_name)
else:
entry_info = f"Stat: {stat.S_IFMT(self.stat.st_mode):o}" if self.stat else ""
Expand Down Expand Up @@ -245,7 +286,7 @@ async def get_entry(self, path: str) -> Optional[FilesystemEntry]:
:return: The child `FilesystemEntry` resource that was found. If nothing was found, `None`
is returned
"""
basename = os.path.basename(path)
basename = posixpath.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
Expand Down Expand Up @@ -313,6 +354,13 @@ class CharacterDevice(SpecialFileType):
"""


def _path_to_posixpath(path: str) -> str:
"""
Converts an OS-specific path to a POSIX style path
"""
return str(PurePath(path).as_posix())


@dataclass
class FilesystemRoot(ResourceView):
"""
Expand Down Expand Up @@ -341,7 +389,10 @@ async def initialize_from_disk(
for root, dirs, files in os.walk(root_path):
for d in sorted(dirs):
absolute_path = os.path.join(root, d)
relative_path = os.path.join(os.path.relpath(root, root_path), d)
relative_path_posix = posixpath.join(
_path_to_posixpath(os.path.relpath(root, root_path)), d
)

folder_attributes_stat = os.lstat(absolute_path)

mode = folder_attributes_stat.st_mode
Expand All @@ -365,23 +416,25 @@ async def initialize_from_disk(
folder_attributes_xattr = self._get_xattr_map(absolute_path)
if os.path.islink(absolute_path):
await self.add_special_file_entry(
relative_path,
relative_path_posix,
SymbolicLink(
relative_path,
relative_path_posix,
folder_attributes_stat,
folder_attributes_xattr,
os.readlink(absolute_path),
),
)
else:
await self.add_folder(
relative_path,
relative_path_posix,
folder_attributes_stat,
folder_attributes_xattr,
)
for f in sorted(files):
absolute_path = os.path.join(root, f)
relative_path = os.path.normpath(os.path.join(os.path.relpath(root, root_path), f))
relative_path_posix = _path_to_posixpath(
os.path.normpath(os.path.join(os.path.relpath(root, root_path), f))
)
file_attributes_stat = os.lstat(absolute_path)

mode = file_attributes_stat.st_mode
Expand All @@ -402,9 +455,9 @@ async def initialize_from_disk(
file_attributes_xattr = self._get_xattr_map(absolute_path)
if os.path.islink(absolute_path):
await self.add_special_file_entry(
relative_path,
relative_path_posix,
SymbolicLink(
relative_path,
relative_path_posix,
file_attributes_stat,
file_attributes_xattr,
os.readlink(absolute_path),
Expand All @@ -413,25 +466,29 @@ async def initialize_from_disk(
elif os.path.isfile(absolute_path):
with open(absolute_path, "rb") as fh:
await self.add_file(
relative_path,
relative_path_posix,
fh.read(),
file_attributes_stat,
file_attributes_xattr,
)
elif stat.S_ISFIFO(mode):
await self.add_special_file_entry(
relative_path,
FIFOPipe(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
FIFOPipe(relative_path_posix, file_attributes_stat, file_attributes_xattr),
)
elif stat.S_ISBLK(mode):
await self.add_special_file_entry(
relative_path,
BlockDevice(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
BlockDevice(
relative_path_posix, file_attributes_stat, file_attributes_xattr
),
)
elif stat.S_ISCHR(mode):
await self.add_special_file_entry(
relative_path,
CharacterDevice(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
CharacterDevice(
relative_path_posix, file_attributes_stat, file_attributes_xattr
),
)
else:
raise NotImplementedError(
Expand Down Expand Up @@ -482,7 +539,7 @@ async def get_entry(self, path: str):
:return: the descendant `FilesystemEntry`, if found; otherwise, returns `None`
"""
basename = os.path.basename(path)
basename = posixpath.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
Expand All @@ -492,7 +549,7 @@ async def get_entry(self, path: str):
)

for d in descendants:
if await d.get_path() == os.path.normpath(path):
if await d.get_path() == posixpath.normpath(path):
return d
return None

Expand Down Expand Up @@ -533,7 +590,7 @@ async def add_folder(
"""
# Normalizes and cleans up paths beginning with "./" and containing "./../" as well as
# other extraneous separators
split_dir = os.path.normpath(path).rstrip("/").lstrip("/").split("/")
split_dir = posixpath.normpath(path).strip("/").split("/")

parent: Union[FilesystemRoot, Folder] = self
for directory in split_dir:
Expand Down Expand Up @@ -585,8 +642,8 @@ async def add_file(
:return: the `File` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
filename = os.path.basename(path)
dirname = posixpath.dirname(path)
filename = posixpath.basename(path)

if dirname == "":
parent_folder = self
Expand Down Expand Up @@ -639,7 +696,7 @@ async def add_special_file_entry(
:return: The special `FilesystemEntry` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
dirname = posixpath.dirname(path)

if dirname == "":
parent_folder = self
Expand Down
6 changes: 3 additions & 3 deletions ofrak_core/ofrak/core/iso9660.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
import os
import posixpath
import tempfile
from dataclasses import dataclass
from io import BytesIO
Expand Down Expand Up @@ -195,7 +195,7 @@ async def unpack(self, resource: Resource, config=None):

for root, dirs, files in iso.walk(**{path_var: "/"}):
for d in dirs:
path = os.path.join(root, d)
path = posixpath.join(root, d)
folder_tags = (ISO9660Entry, Folder)
entry = ISO9660Entry(
name=d,
Expand All @@ -211,7 +211,7 @@ async def unpack(self, resource: Resource, config=None):
path, None, None, folder_tags, entry.get_attributes_instances().values()
)
for f in files:
path = os.path.join(root, f)
path = posixpath.join(root, f)
file_tags = (ISO9660Entry, File)
fp = BytesIO()

Expand Down
2 changes: 1 addition & 1 deletion ofrak_core/ofrak/core/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def unpack(self, resource: Resource, config: ComponentConfig = None) -> No
for filename in stdout.decode().splitlines():
# Handles relative parent paths and rooted paths, and normalizes paths like "./../"
rel_filename = os.path.relpath(filename)
if rel_filename.startswith("../"):
if rel_filename.startswith(".." + os.sep):
raise UnpackerError(
f"Tar archive contains a file {filename} that would extract to a parent "
f"directory {rel_filename}."
Expand Down
Loading

0 comments on commit 280b81c

Please sign in to comment.