Skip to content

Commit

Permalink
Fix Type warnings discovered with pyright LSP
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinMeimar committed Oct 27, 2024
1 parent c8e04b9 commit 6b23d3f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 33 deletions.
2 changes: 1 addition & 1 deletion dragon_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def parse_cli_args() -> CLIArgs:
parser.add_argument("-t", "--time", action="store_true", help="Include the timings (seconds) of each test in the output.")
parser.add_argument("-v", "--verbosity", action="count", default=0, help="Increase verbosity level")

args: CLIArgs = parser.parse_args()
args = parser.parse_args()
if not os.path.isfile(args.config_file):
parser.error(f"The config file {args.config_file} does not exist.")
if bool(args.grade_file) != bool(args.failure_log):
Expand Down
2 changes: 1 addition & 1 deletion dragon_runner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def to_dict(self) -> Dict:
def __repr__(self) -> str:
return json.dumps(self.to_dict(), indent=2)

def load_config(config_path: str, args: CLIArgs=None) -> Optional[Config]:
def load_config(config_path: str, args: Optional[CLIArgs]=None) -> Optional[Config]:
"""
Load and parse the JSON configuration file.
"""
Expand Down
3 changes: 1 addition & 2 deletions dragon_runner/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from colorama import init, Fore
from dragon_runner.cli import parse_cli_args, CLIArgs
from dragon_runner.config import load_config
Expand Down Expand Up @@ -51,4 +50,4 @@ def main():

if __name__ == "__main__":
main()


70 changes: 46 additions & 24 deletions dragon_runner/testfile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from io import BytesIO
from typing import Optional, Union
from dragon_runner.utils import str_to_bytes, bytes_to_str
from dragon_runner.utils import str_to_bytes, bytes_to_str, file_to_bytes
from dragon_runner.errors import Verifiable, ErrorCollection, TestFileError

class TestFile(Verifiable):
Expand All @@ -15,42 +15,55 @@ def __init__(self, test_path, input_dir="input", input_stream_dir="input-stream"
self.input_stream_dir = input_stream_dir
self.output_dir = output_dir
self.comment_syntax = comment_syntax # default C99 //
self.verify()

self.expected_out = self.get_content("CHECK:", "CHECK_FILE:")
self.input_stream = self.get_content("INPUT:", "INPUT_FILE:")

def verify(self) -> ErrorCollection:
"""
Ensure the paths supplied in CHECK_FILE and INPUT_FILE exist
"""
collection = ErrorCollection()
self.expected_out = self._get_content("CHECK:", "CHECK_FILE:")
self.input_stream = self._get_content("INPUT:", "INPUT_FILE:")

# If a parse and read of a tests input or output fails, propagate here
if isinstance(self.expected_out, TestFileError):
collection.add(self.expected_out)
if isinstance(self.input_stream, TestFileError):
collection.add(self.input_stream)
if collection.has_errors():
return collection
return collection

def _get_content(self, inline_directive: str, file_directive: str) -> Union[bytes, TestFileError]:
def get_content(self, inline_directive: str, file_directive: str) -> Union[bytes, TestFileError]:
"""
Generic method to get content based on directives
"""
content = self._get_directive_contents(inline_directive)
if content:
return content
inline_contents = self._get_directive_contents(inline_directive)
file_contents = self._get_directive_contents(file_directive)

if inline_contents and file_contents:
return TestFileError(f"Directive Conflict: Supplied both\
{inline_directive} and {file_directive}")

elif inline_contents:
return inline_contents

file_path = self._get_directive_contents(file_directive)
if file_path:
test_dir = os.path.dirname(self.path)
full_path = os.path.join(test_dir, bytes_to_str(file_path))
elif file_contents:
if isinstance(file_contents, TestFileError):
return file_contents

file_str = file_contents.decode()

full_path = os.path.join(os.path.dirname(self.path), file_str)
if not os.path.exists(full_path):
return TestFileError(f"Failed to locate path supplied to {file_directive}: {full_path}")
return self._get_file_bytes(full_path)
return TestFileError(f"Failed to locate path supplied to\
{file_directive}: {full_path}")

file_bytes = file_to_bytes(full_path)
if file_bytes is None:
return TestFileError(f"Failed to convert file {full_path} to bytes")

return file_bytes

return file_path # default to empty content

else:
return b''

def _get_file_bytes(self, file_path: str) -> Optional[bytes]:
"""
Get file contents in bytes
Expand All @@ -63,7 +76,7 @@ def _get_file_bytes(self, file_path: str) -> Optional[bytes]:
except FileNotFoundError:
return None

def _get_directive_contents(self, directive_prefix: str) -> Union[bytes, TestFileError]:
def _get_directive_contents(self, directive_prefix: str) -> Optional[Union[bytes, TestFileError]]:
"""
Look into the testfile itself for contents defined in directives.
Directives can appear anywhere in a line, as long as they're preceded by a comment syntax.
Expand All @@ -75,7 +88,8 @@ def _get_directive_contents(self, directive_prefix: str) -> Union[bytes, TestFil
for line in test_file:
comment_index = line.find(self.comment_syntax)
directive_index = line.find(directive_prefix)
if comment_index == -1 or directive_index == -1 or comment_index > directive_index:
if comment_index == -1 or directive_index == -1 or\
comment_index > directive_index:
continue

rhs_line = line.split(directive_prefix, 1)[1]
Expand All @@ -100,6 +114,14 @@ def __repr__(self):
if len(test_name) > max_test_name_length:
test_name = test_name[:max_test_name_length - 3] + "..."

expected_out = b''
if isinstance(self.expected_out, bytes):
expected_out = self.expected_out

input_stream = b''
if isinstance(self.input_stream, bytes):
input_stream = self.input_stream

return (f"{test_name:<{max_test_name_length}}"
f"{len(self.expected_out.getvalue()):>4}\t"
f"{len(self.input_stream.getvalue()):>4}")
f"{len(expected_out):>4}\t"
f"{len(input_stream):>4}")
12 changes: 7 additions & 5 deletions dragon_runner/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def resolve_relative(relative_dir: str, abs_path: str) -> str:
abs_path = os.path.dirname(abs_path)
return os.path.join(abs_path, relative_dir)

def make_tmp_file(content: bytes) -> str:
def make_tmp_file(content: bytes) -> Optional[str]:
"""
Create a file in tmp with the bytes from content.
"""
Expand All @@ -26,6 +26,7 @@ def make_tmp_file(content: bytes) -> str:
return tmp.name
except Exception as e:
print(f"Failed to make temporary file with error: {e}", file=sys.stderr)
return None

def str_to_bytes(string: str, chop_newline: bool=False) -> Optional[bytes]:
"""
Expand Down Expand Up @@ -78,7 +79,7 @@ def truncated_bytes(data: bytes, max_bytes: int = 1024) -> bytes:

return truncated

def file_to_str(file: str, max_bytes=1024) -> str:
def file_to_str(file: str, max_bytes=1024) -> Optional[str]:
"""
return file in string form, with middle contents trucated if
size exceeds max_bytes
Expand All @@ -103,9 +104,10 @@ def bytes_to_file(file: str, data: bytes) -> Optional[str]:
"""
assert isinstance(data, bytes), "Supplied bytes that are not of type bytes."
try:
with open(file, 'w') as f:
f.write(bytes_to_str(data))
with open(file, 'wb') as f:
f.write(data)
return file
except Exception as e:
print(f"Writting bytes to file failed with: {e}")
return None
return None

0 comments on commit 6b23d3f

Please sign in to comment.