diff --git a/kmultiversx/pyproject.toml b/kmultiversx/pyproject.toml index e4df84f3..7ed81a66 100644 --- a/kmultiversx/pyproject.toml +++ b/kmultiversx/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "kmultiversx" -version = "0.1.86" +version = "0.1.87" description = "Python tools for Elrond semantics" authors = [ "Runtime Verification, Inc. ", diff --git a/kmultiversx/src/kmultiversx/kasmer.py b/kmultiversx/src/kmultiversx/kasmer.py index 2e9704f0..d2375fd0 100644 --- a/kmultiversx/src/kmultiversx/kasmer.py +++ b/kmultiversx/src/kmultiversx/kasmer.py @@ -1,9 +1,8 @@ from __future__ import annotations -import glob import json import sys -from os.path import join +from pathlib import Path from subprocess import SubprocessError from typing import TYPE_CHECKING, cast @@ -42,7 +41,6 @@ if TYPE_CHECKING: from collections.abc import Iterable, Mapping - from pathlib import Path from hypothesis.strategies import SearchStrategy from pyk.kast.inner import KInner @@ -60,25 +58,8 @@ REC_LIMIT = 4000 -def load_input_json(test_dir: str) -> dict: - try: - with open(join(test_dir, INPUT_FILE_NAME)) as f: - return json.load(f) - except FileNotFoundError: - raise FileNotFoundError(f'{INPUT_FILE_NAME!r} not found in "{test_dir!r}"') from None - - -def find_test_wasm_path(test_dir: str) -> str: - test_wasm_path = glob.glob(test_dir + '/output/*.wasm') - # TODO this loads the first wasm file in the directory. what if there are multiple wasm files? - if test_wasm_path: - return test_wasm_path[0] - else: - raise ValueError(f'WASM file not found: {test_dir}/output/?.wasm') - - -def load_contract_wasms(contract_wasm_paths: Iterable[str]) -> dict[bytes, KInner]: - contract_wasm_modules = {bytes(f, 'ascii'): load_wasm(f) for f in contract_wasm_paths} +def load_contract_wasms(contract_wasm_paths: Iterable[Path]) -> dict[bytes, KInner]: + contract_wasm_modules = {bytes(str(f), 'ascii'): load_wasm(f) for f in contract_wasm_paths} return contract_wasm_modules @@ -167,15 +148,15 @@ def run_config_and_check_empty( # Test metadata -def get_test_endpoints(test_dir: str) -> Mapping[str, tuple[str, ...]]: - abi_paths = glob.glob(test_dir + '/output/*.abi.json') - # TODO this loads the first wasm file in the directory. what if there are multiple wasm files? +def get_test_endpoints(test_dir: Path) -> Mapping[str, tuple[str, ...]]: + abi_paths = list(test_dir.glob('./output/*.abi.json')) + # Test contracts are not supposed to be multi-contract, there should be only 1 abi file if abi_paths: - abi_path = abi_paths[0] + abi_path = Path(abi_paths[0]) else: raise ValueError(f'ABI file not found: {test_dir}/output/?.abi.json') - with open(abi_path) as f: + with abi_path.open() as f: abi_json = json.load(f) endpoints = {} diff --git a/kmultiversx/src/kmultiversx/runtime.py b/kmultiversx/src/kmultiversx/runtime.py index 9e15a088..85d00f0a 100644 --- a/kmultiversx/src/kmultiversx/runtime.py +++ b/kmultiversx/src/kmultiversx/runtime.py @@ -45,7 +45,7 @@ def main() -> None: ) for definition_dir, output_path in targets: - esdt_wasm = load_wasm(str(args.esdt_wasm_path)) + esdt_wasm = load_wasm(args.esdt_wasm_path) krun = KRun(definition_dir) config = load_runtime(krun, esdt_wasm) diff --git a/kmultiversx/src/kmultiversx/scenario.py b/kmultiversx/src/kmultiversx/scenario.py index 023b5f58..5e6e498f 100644 --- a/kmultiversx/src/kmultiversx/scenario.py +++ b/kmultiversx/src/kmultiversx/scenario.py @@ -2,18 +2,20 @@ import argparse import json -import os import subprocess import sys import tempfile +from pathlib import Path from typing import TYPE_CHECKING from Cryptodome.Hash import keccak +from pyk.cli.utils import file_path from pyk.kast.inner import KApply, KSequence, KToken, Subst from pyk.kast.manip import split_config_from from pyk.kdist import kdist from pyk.ktool.krun import KRun from pyk.prelude.collections import set_of +from pyk.utils import abs_or_rel_to, check_file_path from pykwasm.kwasm_ast import KBytes, KInt, KString from kmultiversx.utils import ( @@ -294,7 +296,7 @@ def mandos_arguments_to_klist(arguments: list[str]) -> KInner: return ListBytes(wrapped) -def mandos_to_set_account(address: str, sections: dict, filename: str, output_dir: str) -> list[KApply]: +def mandos_to_set_account(address: str, sections: dict, filename: Path, output_dir: Path) -> list[KApply]: """Creates a K account cell from a Mandos account description.""" address_value = mandos_argument_to_kbytes(address) nonce_value = mandos_int_to_kint(sections.get('nonce', '0')) @@ -404,7 +406,7 @@ def str_to_kast(s: str) -> KInner: return [str_to_kast(r) for r in v['roles']] -def mandos_to_check_account(address: str, sections: dict, filename: str) -> list: +def mandos_to_check_account(address: str, sections: dict, filename: Path) -> list: k_steps: list[KInner] = [] address_value = mandos_argument_to_kbytes(address) if ('nonce' in sections) and (sections['nonce'] != '*'): @@ -424,9 +426,8 @@ def mandos_to_check_account(address: str, sections: dict, filename: str) -> list storage_value = KMapBytesToBytes(wrapped_pairs) k_steps.append(KApply('checkAccountStorage', [address_value, storage_value])) if ('code' in sections) and (sections['code'] != '*'): - code_path = get_contract_code(sections['code'], filename) - if code_path is None: - code_path = '' + code_path_ = get_contract_code(sections['code'], filename) + code_path = '' if code_path_ is None else str(code_path_) k_code_path = KString(code_path) k_steps.append(KApply('checkAccountCode', [address_value, k_code_path])) if ('esdt' in sections) and (sections['esdt'] != '*'): @@ -447,7 +448,7 @@ def mandos_to_check_account(address: str, sections: dict, filename: str) -> list return k_steps -def mandos_to_deploy_tx(tx: dict, filename: str, output_dir: str) -> KInner: +def mandos_to_deploy_tx(tx: dict, filename: Path, output_dir: Path) -> KInner: sender = mandos_argument_to_kbytes(tx['from']) value = mandos_int_to_kint(get_egld_value(tx)) arguments = mandos_arguments_to_klist(tx['arguments']) @@ -455,7 +456,7 @@ def mandos_to_deploy_tx(tx: dict, filename: str, output_dir: str) -> KInner: gas_price = mandos_int_to_kint(tx.get('gasPrice', '0'), default_when_empty=0) code = get_contract_code(tx['contractCode'], filename) - assert isinstance(code, str) + assert code is not None module = file_to_module_decl(code, output_dir) return KApply('deployTx', [sender, value, module, arguments, gas_limit, gas_price]) @@ -576,24 +577,25 @@ def register(with_name: str) -> KInner: return KApply('register', [KString(with_name)]) -def file_to_module_decl(filename: str, output_dir: str) -> KInner: - if filename[-5:] == '.wasm': +def file_to_module_decl(filename: Path, output_dir: Path) -> KInner: + if filename.suffix == '.wasm': return load_wasm(filename) - if filename[-5:] == '.wast' or filename[-4:] == '.wat': + if filename.suffix == '.wast' or filename.suffix == '.wat': return wat_file_to_module_decl(filename, output_dir) - if filename[-10:] == '.mxsc.json': + if filename.name.endswith('.mxsc.json'): return load_wasm_from_mxsc(filename) raise ValueError(f'Filetype not yet supported: {filename}') -def wat_file_to_module_decl(filename: str, output_dir: str) -> KInner: - if not os.path.exists(filename): - raise Exception(f'file {filename} does not exist') +def wat_file_to_module_decl(file_path: Path, output_dir: Path) -> KInner: + file_path = file_path.resolve() + check_file_path(file_path) + + new_wasm_file_path = output_dir / file_path.with_suffix('.wasm').name - new_wasm_filename = os.path.join(output_dir, os.path.basename(filename) + '.wasm') try: - subprocess.check_output(f'wat2wasm {filename} --output={new_wasm_filename}', shell=True) + subprocess.check_output(f'wat2wasm {file_path} --output={new_wasm_file_path}', shell=True) except subprocess.CalledProcessError as e: print('Failed: %s' % e.cmd) print('return code: %d' % e.returncode) @@ -602,24 +604,22 @@ def wat_file_to_module_decl(filename: str, output_dir: str) -> KInner: print('stderr:') print(e.stderr) raise e - return load_wasm(new_wasm_filename) + return load_wasm(new_wasm_file_path) -def get_external_file_path(test_file: str, rel_path_to_new_file: str) -> str: - test_file_path = os.path.dirname(test_file) - ext_file = os.path.normpath(os.path.join(test_file_path, rel_path_to_new_file)) - return ext_file +def get_external_file_path(test_file: Path, rel_path_to_new_file: Path) -> Path: + return abs_or_rel_to(rel_path_to_new_file, test_file.parent).resolve() -def get_contract_code(code: str, filename: str) -> str | None: +def get_contract_code(code: str, filename: Path) -> Path | None: if code[0:5] in ('file:', 'mxsc:'): - return get_external_file_path(filename, code[5:]) + return get_external_file_path(filename, Path(code[5:])) if code == '': return None raise Exception('Currently only support getting code from file, or empty code.') -def get_steps_sc_deploy(step: dict, filename: str, output_dir: str) -> list: +def get_steps_sc_deploy(step: dict, filename: Path, output_dir: Path) -> list: k_steps = [] tx = mandos_to_deploy_tx(step['tx'], filename, output_dir) k_steps.append(tx) @@ -682,7 +682,7 @@ def get_steps_new_addresses(new_addresses: dict | None) -> list[KApply]: return ret -def get_steps_set_state(step: dict, filename: str, output_dir: str) -> list[KApply]: +def get_steps_set_state(step: dict, filename: Path, output_dir: Path) -> list[KApply]: k_steps: list[KApply] = [] if 'accounts' in step: set_accounts = [ @@ -706,7 +706,7 @@ def get_steps_set_state(step: dict, filename: str, output_dir: str) -> list[KApp return k_steps -def get_steps_check_state(step: dict, filename: str) -> list: +def get_steps_check_state(step: dict, filename: Path) -> list: k_steps = [] if 'accounts' in step: for address, sections in step['accounts'].items(): @@ -720,8 +720,8 @@ def get_steps_check_state(step: dict, filename: str) -> list: return k_steps -def get_steps_as_kseq(filename: str, output_dir: str, args: argparse.Namespace) -> list: - with open(filename) as f: +def get_steps_as_kseq(filename: Path, output_dir: Path, args: argparse.Namespace) -> list: + with filename.open() as f: mandos_test = json.loads(f.read()) if 'name' in mandos_test: if args.verbose: @@ -743,7 +743,7 @@ def get_steps_as_kseq(filename: str, output_dir: str, args: argparse.Namespace) elif step['step'] == 'checkState': k_steps.append((step['step'], get_steps_check_state(step, filename))) elif step['step'] == 'externalSteps': - steps_file = get_external_file_path(filename, step['path']) + steps_file = get_external_file_path(filename, Path(step['path'])) print('Load external: %s' % steps_file) k_steps = k_steps + get_steps_as_kseq(steps_file, output_dir, args) elif step['step'] == 'transfer': @@ -758,11 +758,11 @@ def get_steps_as_kseq(filename: str, output_dir: str, args: argparse.Namespace) def run_test_file( krun: KRun, template_wasm_config: KInner, - test_file_path: str, - output_dir: str, + test_file_path: Path, + output_dir: Path, cmd_args: argparse.Namespace, ) -> KInner: - test_name = os.path.basename(test_file_path) + test_name = test_file_path.stem k_steps = get_steps_as_kseq(test_file_path, output_dir, cmd_args) final_config = template_wasm_config @@ -805,17 +805,17 @@ def run_test_file( # ... Setup Elrond Wasm -def log_intermediate_state(krun: KRun, name: str, config: KInner, output_dir: str) -> None: - with open(f'{output_dir}/{name}', 'w') as f: - f.write(kast_to_json_str(config)) - with open(f'{output_dir}/{name}.pretty.k', 'w') as f: - pretty = krun.pretty_print(config) - f.write(pretty) +def log_intermediate_state(krun: KRun, name: str, config: KInner, output_dir: Path) -> None: + json_path = output_dir / name + json_path.write_text(kast_to_json_str(config)) + + kore_path = output_dir / (name + '.pretty.k') + kore_path.write_text(krun.pretty_print(config)) def run_tests() -> None: test_args = argparse.ArgumentParser(description='') - test_args.add_argument('files', metavar='N', type=str, nargs='+', help='') + test_args.add_argument('files', metavar='N', type=file_path, nargs='+', help='') test_args.add_argument('--log-level', choices=['none', 'per-file', 'per-step'], default='per-file') test_args.add_argument('--verbose', action='store_true', help='') test_args.add_argument( @@ -838,14 +838,13 @@ def run_tests() -> None: for test in tests: if args.verbose: print('Running test %s' % test) - tmpdir = tempfile.mkdtemp(prefix='mandos_') + tmpdir = Path(tempfile.mkdtemp(prefix='mandos_')) if args.verbose: print('Intermediate test outputs stored in:\n%s' % tmpdir) - initial_name = '0000_initial_config' - with open(f'{tmpdir}/{initial_name}', 'w') as f: - f.write(kast_to_json_str(template_wasm_config)) - + initial_path = tmpdir / '0000_initial_config' + initial_path.write_text(kast_to_json_str(template_wasm_config)) + assert isinstance(test, Path) run_test_file(krun, template_wasm_config, test, tmpdir, args) if args.verbose: diff --git a/kmultiversx/src/kmultiversx/utils.py b/kmultiversx/src/kmultiversx/utils.py index a8f10799..836125fe 100644 --- a/kmultiversx/src/kmultiversx/utils.py +++ b/kmultiversx/src/kmultiversx/utils.py @@ -33,7 +33,7 @@ def read_kasmer_runtime() -> KInner: def read_kinner_json(path: Path) -> KInner: - with open(str(path)) as f: + with path.open() as f: config_json = json.load(f) return KInner.from_dict(config_json['term']) @@ -50,17 +50,17 @@ def flatten(l: list[list[T]]) -> list[T]: return [item for sublist in l for item in sublist] -def load_wasm(filename: str) -> KInner: - with open(filename, 'rb') as f: - return wasm2kast.wasm2kast(f, filename) +def load_wasm(file_path: Path) -> KInner: + with file_path.open(mode='rb') as f: + return wasm2kast.wasm2kast(f, str(file_path)) -def load_wasm_from_mxsc(filename: str) -> KInner: - with open(filename) as f: +def load_wasm_from_mxsc(file_path: Path) -> KInner: + with file_path.open() as f: contract_json = json.load(f) code_hex = contract_json['code'] code_bytes = bytes.fromhex(code_hex) - return wasm2kast.wasm2kast(BytesIO(code_bytes), filename) + return wasm2kast.wasm2kast(BytesIO(code_bytes), str(file_path)) def krun_config(krun: KRun, conf: KInner, pipe_stderr: bool = False) -> KInner: diff --git a/package/version b/package/version index 93c6c476..ef2f8c81 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.86 +0.1.87