diff --git a/.gitignore b/.gitignore index 07ef085..c743f91 100644 --- a/.gitignore +++ b/.gitignore @@ -61,11 +61,14 @@ diff.md # Lock file *.lock -# tutorial required files +# Tutorial required files !tutorial_dataset.h5 -# tutorial generated files +# Tutorial generated files configure_profile.yaml configure_computer.yaml configure_code.yaml .aiida_run + +# Development test sandbox +test_dev diff --git a/pyproject.toml b/pyproject.toml index d13396b..0c40f94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,11 +23,15 @@ dependencies = [ ] # Entry Points -[project.entry-points] -"aiida.data" = { "fans" = "aiida_fans.data:FANSParameters" } -"aiida.calculations" = { "fans" = "aiida_fans.calculations:FANSCalculation" } -"aiida.parsers" = { "fans" = "aiida_fans.parsers:FANSParser" } -"aiida.cmdline.data" = { "fans" = "aiida_fans.cli:data_cli" } +[project.entry-points."aiida.data"] +"fans" = "aiida_fans.data:FANSParameters" +[project.entry-points."aiida.calculations"] +"fans.stashed" = "aiida_fans.calculations:FansStashedCalculation" +"fans.fragmented" = "aiida_fans.calculations:FansFragmentedCalculation" +[project.entry-points."aiida.parsers"] +"fans" = "aiida_fans.parsers:FansParser" +[project.entry-points."aiida.cmdline.data"] +"fans" = "aiida_fans.cli:data_cli" # Build System [build-system] @@ -56,7 +60,7 @@ platforms = ["linux-64"] [tool.pixi.feature.self] pypi-dependencies = {aiida-fans = { path = ".", editable = true }} [tool.pixi.feature.plugin] -dependencies = {aiida-fans = "0.1.5"} +dependencies = {aiida-fans = "==0.1.5"} # [tool.pixi.feature.aiida] # dependencies = {aiida-core = "2.6.*"} [tool.pixi.feature.fans] diff --git a/src/aiida_fans/calculations.py b/src/aiida_fans/calculations.py index 8f2575f..61c64bf 100644 --- a/src/aiida_fans/calculations.py +++ b/src/aiida_fans/calculations.py @@ -1,260 +1,183 @@ -"""Calculations provided by aiida_fans.""" +"""CalcJob subclasses for aiida-fans calculations.""" from json import dump -from typing import Any, Callable +from pathlib import Path +from shutil import copyfileobj -import h5py from aiida.common.datastructures import CalcInfo, CodeInfo from aiida.common.folders import Folder from aiida.engine import CalcJob from aiida.engine.processes.process_spec import CalcJobProcessSpec from aiida.orm import ArrayData, Dict, Float, Int, List, SinglefileData, Str -from plumpy.utils import AttributesFrozendict +from h5py import File as h5File -from aiida_fans.helpers import InputEncoder - -class FANSCalculation(CalcJob): - """AiiDA calculation plugin wrapping the FANS executable.""" - - @staticmethod - def __input_validator_selector(input: str, note: str) -> Callable[[Any, Any], str | None]: - validators: dict[str, Callable[[Any, Any], str | None]] = { - "microstructure.file": lambda _i, _p: None, - "microstructure.datasetname": lambda _i, _p: None, - "microstructure.L": lambda i, _p: note if len(i) != 3 else None, # TODO: check elements are numbers - "problem_type": lambda i, _p: note if i.value not in {"thermal", "mechanical"} else None, - "matmodel": lambda i, _p: note - if i.value - not in { - "LinearThermalIsotropic", - "LinearElasticIsotropic", - "PseudoPlasticLinearHardening", - "PseudoPlasticNonLinearHardening", - "J2ViscoPlastic_LinearIsotropicHardening", - "J2ViscoPlastic_NonLinearIsotropicHardening", - } - else None, - "material_properties": lambda _i, _p: None, # TODO: material properties - "method": lambda i, _p: note if i.value not in {"cg", "fp"} else None, - "error_parameters.measure": lambda i, _p: note if i.value not in {"Linfinity", "L1", "L2"} else None, - "error_parameters.type": lambda i, _p: note if i.value not in {"absolute", "relative"} else None, - "error_parameters.tolerance": lambda _i, _p: None, - "n_it": lambda _i, _p: None, - "macroscale_loading": lambda _i, _p: None, # TODO: macroscale loading - "results": lambda i, _p: note - if not set(i.get_list()) - <= { - "stress_average", - "strain_average", - "absolute_error", - "phase_stress_average", - "phase_strain_average", - "microstructure", - "displacement", - "stress", - "strain", - } - else None, - } - return validators[input] +class FansCalcBase(CalcJob): + """Base class of all calculations using FANS.""" @classmethod def define(cls, spec: CalcJobProcessSpec) -> None: - """Define inputs, outputs, and exit_codes of the calculation.""" + """Define inputs, outputs, and exit codes of the calculation.""" super().define(spec) # Metadata + spec.inputs["metadata"]["label"].default = "FANS" + # spec.inputs["metadata"]["dry_run"].default = True + ## Processing Power + spec.inputs["metadata"]["options"]["withmpi"].default = True spec.inputs["metadata"]["options"]["resources"].default = { "num_machines": 1, - "num_mpiprocs_per_machine": 4, + "num_mpiprocs_per_machine": 4 } - spec.inputs["metadata"]["options"]["withmpi"].default = True - spec.inputs["metadata"]["options"]["parser_name"].default = "fans" + ## Filenames spec.inputs["metadata"]["options"]["input_filename"].default = "input.json" spec.inputs["metadata"]["options"]["output_filename"].default = "output.h5" + ## Parser + spec.inputs["metadata"]["options"]["parser_name"].default = "fans" - # New Ports: - spec.input_namespace("microstructure", help=(note := "The microstructure definition.")) - spec.input( - (input := "microstructure.file"), - valid_type=SinglefileData, - validator=cls.__input_validator_selector(input, note), - help=(note := "This specifies the path to the HDF5 file that contains the microstructure data."), - ) - spec.input( - (input := "microstructure.datasetname"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "This is the path within the HDF5 file to the specific dataset that represents the microstructure." - ), - ) - spec.input( - (input := "microstructure.L"), - valid_type=List, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "Microstructure length defines the physical dimensions of the microstructure in the x, y, and z directions." # noqa: E501 - ), - ) - - spec.input( - (input := "problem_type"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "This defines the type of physical problem you are solving. Common options include `thermal` problems and `mechanical` problems." # noqa: E501 - ), - ) - spec.input( - (input := "matmodel"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=(note := "This specifies the material model to be used in the simulation."), - ) - spec.input( - (input := "material_properties"), - valid_type=Dict, - validator=cls.__input_validator_selector(input, note), - help=(note := "This provides the necessary material parameters for the chosen material model."), - ) - spec.input( - (input := "method"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "This indicates the numerical method to be used for solving the system of equations. `cg` stands for the Conjugate Gradient method, and `fp` stands for the Fixed Point method." # noqa: E501 - ), - ) - - spec.input_namespace( - "error_parameters", - help=( - note - := "This section defines the error parameters for the solver. Error control is applied on the finite element nodal residual of the problem." # noqa: E501 - ), - ) - spec.input( - (input := "error_parameters.measure"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=(note := "Specifies the norm used to measure the error. Options include `Linfinity`, `L1`, or `L2`."), - ) - spec.input( - (input := "error_parameters.type"), - valid_type=Str, - validator=cls.__input_validator_selector(input, note), - help=(note := "Defines the type of error measurement. Options are `absolute` or `relative`."), - ) - spec.input( - (input := "error_parameters.tolerance"), - valid_type=Float, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "Sets the tolerance level for the solver, defining the convergence criterion based on the chosen error measure. The solver iterates until the solution meets this tolerance." # noqa: E501 - ), - ) - - spec.input( - (input := "n_it"), - valid_type=Int, - validator=cls.__input_validator_selector(input, note), - help=(note := "Specifies the maximum number of iterations allowed for the FANS solver."), - ) - spec.input( - (input := "macroscale_loading"), - valid_type=ArrayData, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "This defines the external loading applied to the microstructure. It is an array of arrays, where each sub-array represents a loading condition applied to the system. The format of the loading array depends on the problem type." # noqa: E501 - ), - ) - spec.input( - (input := "results"), - valid_type=List, - validator=cls.__input_validator_selector(input, note), - help=( - note - := "This array lists the quantities that should be stored into the results HDF5 file during the simulation." # noqa: E501 - ), - ) - - spec.output("results", valid_type=SinglefileData) - - # Exit Codes: + # Input Ports + ## Microstructure Definition + spec.input_namespace("microstructure") + spec.input("microstructure.file", valid_type=SinglefileData) + spec.input("microstructure.datasetname", valid_type=Str) + spec.input("microstructure.L", valid_type=List) + ## Problem Type and Material Model + spec.input("problem_type", valid_type=Str) + spec.input("matmodel", valid_type=Str) + spec.input("material_properties", valid_type=Dict) + ## Solver Settings + spec.input("method", valid_type=Str) + spec.input("n_it", valid_type=Int) + spec.input_namespace("error_parameters") + spec.input("error_parameters.measure", valid_type=Str) + spec.input("error_parameters.type", valid_type=Str) + spec.input("error_parameters.tolerance", valid_type=Float) + ## Macroscale Loading Conditions + spec.input("macroscale_loading", valid_type=ArrayData) + ## Results Specification + spec.input("results", valid_type=List) + + # Output Ports + spec.output("output", valid_type=SinglefileData) + spec.output("results", valid_type=Dict, required=False) + + # Exit Codes spec.exit_code(400, "PLACEHOLDER", "This is an error code, yet to be implemented.") def prepare_for_submission(self, folder: Folder) -> CalcInfo: - """Creates the input file required by the calculation. - - Args: - folder (Folder): where the plugin should temporarily place all files needed by the calculation - - Returns: - CalcInfo: the data to be passed to the ExecManager - """ - # Write Microstructure Subset to Folder - datasetname : str = self.inputs.microstructure.datasetname.value - with folder.open("microstructure.h5","bw") as f_dest: - with h5py.File(f_dest,"w") as h5_dest: - with self.inputs.microstructure.file.open(mode="rb") as f_src: - with h5py.File(f_src,'r') as h5_src: - h5_src.copy(datasetname, h5_dest, name=datasetname) - - # Write input.json to Folder - json_to_be = dict(self.inputs) - del json_to_be["code"], json_to_be["metadata"] - to_fix = {} - for key, value in json_to_be.items(): - if isinstance(value, AttributesFrozendict): # can be moved to InputEncoder? - to_fix[key] = {} - for k, v in json_to_be[key].items(): - to_fix[key][k] = v - json_to_be = json_to_be | to_fix - - to_add = {} - for key, value in json_to_be.items(): - if key == "microstructure": - for k, v in value.items(): - if k == "file": - to_add[f"ms_{k}name"] = "microstructure.h5" - else: - to_add[f"ms_{k}"] = v - - json_to_be = to_add | json_to_be - del json_to_be["microstructure"] - - with folder.open(self.options.input_filename, "w", "utf8") as handle: - dump(json_to_be, handle, cls=InputEncoder, indent=4) - - # Specifying code info. + """Prepare the calculation for submission.""" + # Specifying the code info: codeinfo = CodeInfo() codeinfo.code_uuid = self.inputs.code.uuid - codeinfo.stdout_name = self.options.input_filename + ".log" - codeinfo.stderr_name = self.options.input_filename + ".err" + codeinfo.stdout_name = self.metadata.label + ".log" + codeinfo.stderr_name = self.metadata.label + ".err" codeinfo.cmdline_params = [self.options.input_filename, self.options.output_filename] - # Specifying calc info. + # Specifying the calc info: calcinfo = CalcInfo() calcinfo.codes_info = [codeinfo] calcinfo.local_copy_list = [] calcinfo.remote_copy_list = [] - calcinfo.retrieve_list = [ - self.options.input_filename + ".log", - self.options.input_filename + ".err", - ] + calcinfo.retrieve_list = [codeinfo.stdout_name, codeinfo.stderr_name] calcinfo.retrieve_temporary_list = [ self.options.output_filename ] - calcinfo.provenance_exclude_list = [ - "microstructure.h5" - ] return calcinfo + + +class FansStashedCalculation(FansCalcBase): + """Calculations using FANS and the "Stashed" microstructure distribution strategy.""" + + @classmethod + def define(cls, spec: CalcJobProcessSpec) -> None: + """Define inputs, outputs, and exit codes of the calculation.""" + return super().define(spec) + + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + """Prepare the calculation for submission.""" + ms_filepath: Path = Path(self.inputs.code.computer.get_workdir()) / \ + "stash/microstructures" / \ + self.inputs.microstructure.file.filename + # if microstructure does not exist in stash, make it + if not ms_filepath.is_file(): + ms_filepath.parent.mkdir(parents=True, exist_ok=True) + with self.inputs.microstructure.file.open(mode='rb') as source: + with ms_filepath.open(mode='wb') as target: + copyfileobj(source, target) + + # input.json as dict + input_dict = { + ## Microstructure Definition + "ms_filename": str(ms_filepath), # path to stashed microstructure + "ms_datasetname": self.inputs.microstructure.datasetname.value, + "ms_L": self.inputs.microstructure.L.get_list(), + ## Problem Type and Material Model + "problem_type": self.inputs.problem_type.value, + "matmodel": self.inputs.matmodel.value, + "material_properties": self.inputs.material_properties.get_dict(), + ## Solver Settings + "method": self.inputs.method.value, + "n_it": self.inputs.n_it.value, + "error_parameters": { + "measure": self.inputs.error_parameters.measure.value, + "type": self.inputs.error_parameters.type.value, + "tolerance": self.inputs.error_parameters.tolerance.value + }, + ## Macroscale Loading Conditions + "macroscale_loading": [a[1].tolist() for a in self.inputs.macroscale_loading.get_iterarrays()], + ## Results Specification + "results": self.inputs.results.get_list() + } + # write input.json to working directory + with folder.open(self.options.input_filename, "w", "utf8") as json: + dump(input_dict, json, indent=4) + + return super().prepare_for_submission(folder) + +class FansFragmentedCalculation(FansCalcBase): + """Calculations using FANS and the "Fragmented" microstructure distribution strategy.""" + + @classmethod + def define(cls, spec: CalcJobProcessSpec) -> None: + """Define inputs, outputs, and exit codes of the calculation.""" + return super().define(spec) + + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + """Prepare the calculation for submission.""" + # Write Microstructure Subset to Folder + datasetname : str = self.inputs.microstructure.datasetname.value + with folder.open("microstructure.h5","bw") as f_dest: + with h5File(f_dest,"w") as h5_dest: + with self.inputs.microstructure.file.open(mode="rb") as f_src: + with h5File(f_src,'r') as h5_src: + h5_src.copy(datasetname, h5_dest, name=datasetname) + + # input.json as dict + input_dict = { + ## Microstructure Definition + "ms_filename": "microstructure.h5", # path to fragmented microstructure + "ms_datasetname": self.inputs.microstructure.datasetname.value, + "ms_L": self.inputs.microstructure.L.get_list(), + ## Problem Type and Material Model + "problem_type": self.inputs.problem_type.value, + "matmodel": self.inputs.matmodel.value, + "material_properties": self.inputs.material_properties.get_dict(), + ## Solver Settings + "method": self.inputs.method.value, + "n_it": self.inputs.n_it.value, + "error_parameters": { + "measure": self.inputs.error_parameters.measure.value, + "type": self.inputs.error_parameters.type.value, + "tolerance": self.inputs.error_parameters.tolerance.value + }, + ## Macroscale Loading Conditions + "macroscale_loading": [a[1].tolist() for a in self.inputs.macroscale_loading.get_iterarrays()], + ## Results Specification + "results": self.inputs.results.get_list() + } + # write input.json to working directory + with folder.open(self.options.input_filename, "w", "utf8") as json: + dump(input_dict, json, indent=4) + + return super().prepare_for_submission(folder) diff --git a/src/aiida_fans/helpers.py b/src/aiida_fans/helpers.py index 2dad749..6ab9fc4 100644 --- a/src/aiida_fans/helpers.py +++ b/src/aiida_fans/helpers.py @@ -1,8 +1,9 @@ -"""Tools and utilities required by aiida_fans.""" +"""Tools required by aiida_fans.""" import json from aiida.orm import ArrayData, Dict, Float, Int, List, SinglefileData, Str +from numpy import allclose, ndarray class InputEncoder(json.JSONEncoder): @@ -24,3 +25,9 @@ def default(self, obj): case _: # Let the base class default method raise the TypeError return super().default(obj) + +def arraydata_equal(first: dict[str, ndarray], second: dict[str, ndarray]) -> bool: + """Return whether two dicts of arrays are roughly equal.""" + if first.keys() != second.keys(): + return False + return all(allclose(first[key], second[key]) for key in first) diff --git a/src/aiida_fans/parsers.py b/src/aiida_fans/parsers.py index 4e01788..9d05479 100644 --- a/src/aiida_fans/parsers.py +++ b/src/aiida_fans/parsers.py @@ -1,43 +1,52 @@ -"""Parsers provided by aiida_fans.""" +"""Parser subclass for aiida-fans calculations.""" from pathlib import Path from aiida.engine import ExitCode -from aiida.orm import SinglefileData +from aiida.orm import CalcJobNode, Dict, SinglefileData from aiida.parsers.parser import Parser -from aiida.plugins import CalculationFactory +from h5py import Dataset, Group +from h5py import File as h5File -FANSCalculation = CalculationFactory("fans") +class FansParser(Parser): + """Extracts data from FANS results.""" -class FANSParser(Parser): - """Extracts valuable data from FANS results.""" + def __init__(self, node: CalcJobNode): + """Calls `super().__init__()` then defines `self.results_dict`.""" + super().__init__(node) + self.results_dict = dict() - def parse(self, **kwargs) -> ExitCode: - """Parse outputs, store results in database. - - Returns: - ExitCode: non-zero exit code, if parsing fails - """ - retrieved_temporary_folder = Path(kwargs["retrieved_temporary_folder"]) - output_filename = self.node.get_option("output_filename") - - # Check that output_filename is valid - if (type(output_filename) is not str) or (output_filename == ""): - return self.exit_codes.ERROR_INVALID_OUTPUT - - # Check that folder content is as expected. - files_retrieved = set(self.retrieved.list_object_names()) - files_expected = set()#{output_filename} - if not files_expected <= files_retrieved: - self.logger.error(f"Found files '{files_retrieved}', expected to find '{files_expected}'") + def parse(self, **kwargs) -> ExitCode | None: + """Parse outputs and store results as nodes.""" + output_path: Path = Path(kwargs["retrieved_temporary_folder"]) / self.node.get_option("output_filename") # type: ignore + if output_path.is_file(): + self.out("output", node=SinglefileData(output_path)) + else: return self.exit_codes.ERROR_MISSING_OUTPUT - # Add output HDF5 file to repository. - output_path = retrieved_temporary_folder / output_filename - self.logger.info(f"Parsing '{output_path}'") - with output_path.open("rb") as handle: - output_node = SinglefileData(file=handle) - self.out("results", output_node) - - return ExitCode(0) + with h5File(output_path) as h5: + results = h5[self.node.inputs.microstructure.datasetname.value] + results.visititems(self.parse_h5) + + if self.results_dict: + self.out("results", Dict(self.results_dict)) + + def parse_h5(self, name: str, object: Group | Dataset) -> None: + """Callable for the .visititems method of h5py Groups.""" + if isinstance(object, Group): + return + if "average" in name: + keys = name.split("/") + res = self.results_dict + data = list(object[:]) + self.nestle(res, keys, data) + + def nestle(self, bottom: dict, layers: list[str], top: list[float]) -> None: + """Recursive function to generate a nested results dictionary.""" + layer = layers.pop(0) + if len(layers) > 0: + bottom.setdefault(layer, dict()) + self.nestle(bottom[layer], layers, top) + else: + bottom[layer] = top diff --git a/src/aiida_fans/utils.py b/src/aiida_fans/utils.py new file mode 100644 index 0000000..f1be6d4 --- /dev/null +++ b/src/aiida_fans/utils.py @@ -0,0 +1,210 @@ +"""Utilities provided by aiida_fans.""" + +from typing import Any, Literal + +from aiida.engine import run, submit +from aiida.orm import CalcJobNode, Data, Node, QueryBuilder +from aiida.plugins import CalculationFactory, DataFactory +from numpy import ndarray + +from aiida_fans.helpers import arraydata_equal + + +def aiida_type(value : Any) -> type[Data]: + """Find the corresponding AiiDA datatype for a variable with pythonic type. + + Args: + value (Any): a python variable + + Raises: + NotImplementedError: only certain mappings are supported + + Returns: + type[Data]: an AiiDA data type + """ + match value: + case str(): + return DataFactory("core.str") # Str + case int(): + return DataFactory("core.int") # Int + case float(): + return DataFactory("core.float") # Float + case list(): + return DataFactory("core.list") # List + case dict(): + if all(map(lambda t: isinstance(t, ndarray), value.values())): + return DataFactory("core.array") # ArrayData + else: + return DataFactory("core.dict") # Dict + case _: + raise NotImplementedError(f"Received an input of value: {value} with type: {type(value)}") + +def fetch(label : str, value : Any) -> list[Node]: + """Return a list of nodes matching the label and value provided. + + Args: + label (str): the label of the node to fetch + value (Any): the value of the node to fetch + + Returns: + list[Node]: the list of nodes matching the give criteria + """ + datatype = aiida_type(value) + nodes = QueryBuilder( + ).append(cls=datatype, tag="n" + ).add_filter("n", {"label": label} + ).add_filter("n", {"attributes": {"==": datatype(value).base.attributes.all}} + ).all(flat=True) + + if datatype != DataFactory("core.array"): + return nodes # type: ignore + else: + array_nodes = [] + for array_node in nodes: + array_value = {k:v for k, v in [(name, array_node.get_array(name)) for name in array_node.get_arraynames()]} + if arraydata_equal(value, array_value): + array_nodes.append(array_node) + return array_nodes + +def generate(label : str, value : Any) -> Node: + """Return a single node with the label and value provided. + + Uses an existing node when possible, but otherwise creates one instead. + + Args: + label (str): the label of the node to generate + value (Any): the pythonic value of the node to generate + + Raises: + RuntimeError: panic if more than one node is found matching the criteria + + Returns: + Node: a stored node with label and value + """ + bone = fetch(label, value) + if len(bone) == 0: + return aiida_type(value)(value, label=label).store() + elif len(bone) == 1: + return bone.pop() + else: + raise RuntimeError + +def convert(ins : dict[str, Any], path : list[str] = []): + """Takes a dictionary of inputs and converts the values to their respective Nodes. + + Args: + ins (dict[str, Any]): a dictionary of inputs + path (list[str], optional): a list of predecessor keys for nested dictionaries. Defaults to []. + """ + for k, v in ins.items(): + if k == "metadata" or isinstance(v, Node): + continue + if k in ["microstructure", "error_parameters"]: + convert(v, path=[*path, k]) + else: + ins[k] = generate(".".join([*path, k]), v) + +def compile_query(ins : dict[str,Any], qb : QueryBuilder) -> None: + """Interate over the converted input dictionary and append to the QueryBuilder for each node. + + Args: + ins (dict[str,Any]): a dictionary of converted inputs + qb (QueryBuilder): a CalcJobNode QueryBuilder with tag='calc' + """ + for k, v in ins.items(): + if k == "metadata": + continue + if k in ["microstructure", "error_parameters"] and isinstance(v, dict): + compile_query(v, qb) + else: + qb.append( + cls=type(v), + with_outgoing="calc", + filters={"pk": v.pk} + ) + + +def execute_fans( + mode: Literal["Submit", "Run"], + inputs: dict[str, Any], + strategy: Literal["Fragmented", "Stashed"] = "Fragmented", + ): + """This utility function simplifies the process of executing aiida-fans jobs. + + The only nodes you must provide are the `code` and `microstructure` inputs. + Other inputs can be given as standard python variables. Your repository will + be automatically scanned for equivalent nodes. These will be used whenever + possible, otherwise new nodes will be created. + + The `strategy` specifies which microstructure distribution method you wish to use. + It defaults to "Fragmented". + + You must load an AiiDA profile yourself before using this function. + + **Args:** + **mode** *(Literal["Submit", "Run"])* + **inputs** *(dict[str, Any])* + **strategy** *(Literal["Fragmented", "Stashed"]), optional* + + --- + + **Example:** + ``` + from aiida import load_profile + from aiida.orm import load_code, load_node + from aiida_fans.utils import execute_fans + load_profile() + inputs = { + "code": load_code("fans"), + "microstructure": load_node(label="microstructure"), + ... + "metadata": { + "label": "an example calculation" + } + } + execute_fans("Submit", inputs, "Stashed") + ``` + """ + # update inputs with metadata.options.stash if necessary: + match strategy: + case "Stashed": + calcjob = CalculationFactory("fans.stashed") + case "Fragmented": + calcjob = CalculationFactory("fans.fragmented") + case _: + print("ERROR: Calculation strategy must be either 'Fragmented' or 'Stashed'.") + raise ValueError + + # fetch the inputs if possible or otherwise create them + convert(inputs) + + # check if identical calculation already exists + qb = QueryBuilder().append(cls=CalcJobNode, tag="calc", project="id") + compile_query(inputs, qb) + results = qb.all(flat=True) + if (count := len(results)) != 0: + print(f"It seems this calculation has already been performed {count} time{"s" if count > 1 else ""}. {results}") + confirmation = input("Are you sure you want to rerun it? [y/N] ").strip().lower() in ["y", "yes"] + else: + confirmation = True + + if confirmation: + match mode: + case "Run": + run(calcjob, inputs) # type: ignore + case "Submit": + submit(calcjob, inputs) # type: ignore + +def submit_fans( + inputs: dict[str, Any], + strategy: Literal["Fragmented", "Stashed"] = "Fragmented", +): + """See `execute_fans` for implementation and usage details.""" + execute_fans("Submit", inputs, strategy) + +def run_fans( + inputs: dict[str, Any], + strategy: Literal["Fragmented", "Stashed"] = "Fragmented", +): + """See `execute_fans` for implementation and usage details.""" + execute_fans("Run", inputs, strategy)