Source code for roxieapi.tool_adapter.RoxieToolAdapter
import datetime
import fnmatch
import shutil
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from subprocess import PIPE, Popen
from tempfile import TemporaryDirectory
from typing import Dict, List, Optional, Set
from urllib.parse import urljoin
import requests
from roxieapi.input.builder import RoxieInputBuilder
[docs]
class RoxieToolAdapter(ABC):
"""A RoxieToolAdapter class with methods to execute a ROXIE model from
command line, read figures of merit table and prepare a Lorentz force
file."""
def __init__(
self,
input_files: List[Path],
) -> None:
"""A constructor of a RoxieToolAdapter instance.
:param input_files: A list of input files for the execution
(data file, iron file, cadata file, bhdata file)
:param output_file_path: a name of an output file, .output
:param xml_output_file_path: a name of an xml output file with
plots, .xml
"""
self.input_files: Dict[str, Path] = {}
self.needs_datafile_update = False
self._collect_input_files(input_files)
self.input_builder = self._sync_file_deps()
self._check_file_deps()
self.output: str = ""
self.errors: str = ""
def _collect_input_files(self, input_files: List[Path]) -> None:
for file in input_files:
ending = file.suffix
if ending == ".hmo":
ending = ".iron"
if ending in self.input_files:
raise ValueError(
f"File {file} would replace existing file: {self.input_files[ending]}"
)
self.input_files[ending] = file.absolute()
if ".data" not in self.input_files:
raise ValueError("Datafile is missing in input files")
def _sync_file_deps(self) -> RoxieInputBuilder:
"""Synchronize file paths between input files and datafile inputs.
Iterates through input files and datafile inputs and
synchronizes both. Input files in the constructor have priority
over files mentioned in the datafile.
"""
needs_change = False
if not self.input_files[".data"].is_file():
raise ValueError(f"Cannot read .data file: {self.input_files['.data']}")
rib = RoxieInputBuilder.from_datafile(self.input_files[".data"])
for file_type, file_path in [
(".cadata", "cadata_path"),
(".bhdata", "bhdata_path"),
(".iron", "iron_path"),
]:
if file_type in self.input_files:
setattr(rib, file_path, str(self.input_files[file_type]))
needs_change = True
elif fp := getattr(rib, file_path).strip():
if fp == "none":
continue
path = Path(fp)
if path.is_absolute():
self.input_files[file_type] = path
else:
self.input_files[file_type] = (
self.input_files[".data"].parent / path
)
self.needs_datafile_update = needs_change
return rib
def _check_file_deps(self) -> None:
"""Check if all files are accessible and can be used.
:raises ValueError: In case one file cannot be accessed
"""
for ft, file in self.input_files.items():
if not file.is_file():
if ft == ".bhdata" and not (
self.input_builder.flags["LBEMFEM"]
or self.input_builder.flags["LPSI"]
):
continue # OK, not needed
if ft == ".iron" and not (
self.input_builder.flags["LIRON"]
or self.input_builder.flags["LHMO"]
):
continue # OK, not needed
raise ValueError(f"Cannot read {ft} file: {file}")
[docs]
def download_artefacts(self, output_directory: Path, *patterns) -> None:
"""Downloads artefacts based on the given patterns to the specified
output directory.
Args:
output_directory (Path): The directory where the artefacts will be downloaded.
*patterns: Variable length argument list of patterns to filter artefacts.
Returns:
None
"""
artefacts = self.get_artefact_list()
artefacts_filtered: Set[str] = set()
if not patterns:
artefacts_filtered.update(artefacts)
else:
for pat in patterns:
matching = fnmatch.filter(artefacts, pat)
artefacts_filtered.update(matching)
for artefact in artefacts_filtered:
self.download_artefact(output_directory, artefact)
[docs]
@abstractmethod
def get_artefact_list(self) -> list[str]:
raise NotImplementedError("This method is not implemented for this class")
[docs]
@abstractmethod
def run(self) -> int:
"""Execute a ROXIE simulation."""
raise NotImplementedError("This method is not implemented for this class")
[docs]
@abstractmethod
def download_artefact(self, output_directory: Path, artefact: str) -> None:
"""A method to download an artefact to the specified output directory.
Args:
output_directory (Path): The directory where the artefact will be downloaded.
artefact (str): The name of the artefact to be downloaded.
Returns:
None
"""
raise NotImplementedError("This method is not implemented for this class")
[docs]
class RestRoxieToolAdapter(RoxieToolAdapter):
def __init__(
self,
service_url: str,
input_files: List[Path],
session: Optional[requests.Session] = None,
) -> None:
"""Initializes the object with the provided service URL and input
files.
:param service_url: The URL of the service to connect to. (str)
:param input_files: The input files to use. (List[Path])
:param session: An optional existing session to use
"""
super().__init__(input_files=input_files)
self.base_url = service_url
self.model_name: Optional[str] = None
self.timestamp: Optional[str] = None
if session:
self.session = session
else:
self.session = requests.Session()
def _update_datafile(self):
"""Update datafile to define all paths relative."""
if cadata := self.input_files.get("cadata"):
self.input_builder.cadata_path = cadata.name
if bhdata := self.input_files.get("bhdata"):
self.input_builder.bhdata_path = bhdata.name
if iron := self.input_files.get("iron"):
self.input_builder.iron_path = iron.name
# Create temporary file
tmp_data_filename = Path(tempfile.gettempdir()) / self.input_files[".data"].name
self.input_builder.build(tmp_data_filename)
self.input_files[".data"] = tmp_data_filename
[docs]
def run(self) -> int:
self._update_datafile()
self.model_name = self.input_files[".data"].stem
# get timestamp
response = self.session.post(
urljoin(self.base_url, f"/model/{self.model_name}")
)
response.raise_for_status()
self.timestamp = response.json()["timestamp"]
# upload input files
data = {"model_name": self.model_name, "timestamp": self.timestamp}
response = self.session.post(
urljoin(self.base_url, "/model"),
data=data,
files=[("files", open(fp, "rb")) for _, fp in self.input_files.items()],
)
response.raise_for_status()
# Run
response = self.session.post(
urljoin(self.base_url, f"/model/{self.model_name}/{self.timestamp}/run")
)
response.raise_for_status()
response_json = response.json()
self.output = response_json["output"]
self.errors = response_json.get("errors", "")
self.model_name = response_json["model_name"]
self.timestamp = response_json["timestamp"]
status = response_json["status"]
return 0 if status else 1
[docs]
def get_artefact_list(self) -> List[str]:
"""Return a list of artefacts available to download.
:return: The list of artefacts
"""
if self.model_name is None or self.timestamp is None:
raise RuntimeError(
"No execution results avaible, Model needs to run sucessfully first"
)
artefactlist_url = urljoin(
self.base_url, f"/artefacts/{self.model_name}/{self.timestamp}"
)
response = self.session.get(artefactlist_url)
response.raise_for_status()
return response.json()["artefacts"]
[docs]
def download_artefact(self, output_directory: Path, artefact: str) -> None:
if self.model_name is None or self.timestamp is None:
raise RuntimeError(
"No execution results avaible, Model needs to run sucessfully first"
)
if not output_directory.is_dir():
raise RuntimeError(f"Output path {output_directory} is not a directory")
artefact_url = urljoin(
self.base_url, f"artefact/{self.model_name}/{self.timestamp}/{artefact}"
)
response_artefact = self.session.get(artefact_url)
response_artefact.raise_for_status()
with open(Path(output_directory) / artefact, "wb") as file:
file.write(response_artefact.content)
[docs]
class TerminalRoxieToolAdapter(RoxieToolAdapter):
def __init__(
self,
input_files: List[Path],
run_in_tmp_folder: bool = True,
tmp_path_parent: Optional[str] = None,
executable_name: str = "runroxie",
overwrite_arguments: Optional[List[str]] = None,
):
"""Initialize the RoxieTooladapter with the given input files and
configuration options.
:param input_files: List of input files for the
RoxieTooladapter. (List[Path])
:param run_in_tmp_folder: Flag to run the RoxieTooladapter in a
temporary folder. Defaults to True. (bool)
:param tmp_path: Parent Path to the temporary folder. Temporary
folder will be created here Defaults to None.
(Optional[str])
:param executable_name: Name of the executable. Defaults to
"runroxie". (str)
:param overwrite_arguments: List of arguments to overwrite.
Defaults to None. (Optional[List[str]])
:raises RuntimeError: If run_in_tmp_folder is False and datafile
needs to be updated.
:return: None
:rtype: None
"""
super().__init__(input_files)
self.executable_name = executable_name
self.artefact_list: Set[Path] = set()
self.tmp_folder: Optional[TemporaryDirectory]
if run_in_tmp_folder:
self.tmp_folder = TemporaryDirectory(dir=tmp_path_parent)
self.exec_folder = Path(self.tmp_folder.name)
datafile_abspath = self.input_files[".data"].parent.absolute()
if p := Path(self.input_builder.cadata_path.strip()):
if not p.is_absolute():
self.input_builder.cadata_path = str(datafile_abspath / p)
self.needs_datafile_update = True
if p := Path(self.input_builder.bhdata_path.strip()):
if not p.is_absolute():
self.input_builder.bhdata_path = str(datafile_abspath / p)
self.needs_datafile_update = True
if p := Path(self.input_builder.iron_path.strip()):
if not p.is_absolute():
self.input_builder.iron_path = str(datafile_abspath / p)
self.needs_datafile_update = True
else:
self.tmp_folder = None
self.exec_folder = self.input_files[".data"].parent
self.overwrite_arguments = overwrite_arguments
def _get_process_command(self) -> List[str]:
if self.overwrite_arguments:
args = self.overwrite_arguments
else:
args = [self.input_files[".data"].name]
return [self.executable_name, *args]
[docs]
def run(self) -> int:
# Different execution modes depending on the operating system
p = self.start_process()
byte_output, byte_error = p.communicate(
b"input data that is passed to subprocess' stdin"
)
byte_output_decode = byte_output.decode()
byte_error_decode = byte_error.decode()
self.output = byte_output_decode
self.errors = byte_error_decode
retcode = p.returncode
self.collect_outputs()
return retcode
[docs]
def collect_outputs(self):
modified_files = {
f
for f in Path(self.exec_folder).iterdir()
if datetime.datetime.fromtimestamp(f.stat().st_mtime) > self.exec_timestamp
}
self.artefact_list = modified_files
[docs]
def start_process(self):
if self.needs_datafile_update:
if not self.tmp_folder:
raise RuntimeError(
"Datafile needs to be rewritten but run in tmp folder is deactivated. "
"RoxieTooladapter does not overwrite existing datafiles. Change execution to run int tmp_folder "
"or fix paths in data file."
)
tmp_data_file = Path(self.tmp_folder.name) / self.input_files[".data"].name
self.input_builder.build(tmp_data_file)
self.input_files[".data"] = tmp_data_file
command = self._get_process_command()
self.exec_timestamp = datetime.datetime.now()
p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=self.exec_folder)
return p
[docs]
def download_artefact(self, output_directory: Path, artefact: str) -> None:
fn = self.exec_folder / artefact
if not fn.exists():
raise RuntimeError(f"Artefact {artefact} does not exist in output")
if not output_directory.is_dir():
raise RuntimeError(f"Output path {output_directory} is not a directory")
shutil.copy(fn, output_directory)