Source code for roxieapi.output.parser

import logging
import string
from dataclasses import dataclass
from typing import Dict, List, Optional

import numpy as np
import pandas as pd

from roxieapi.commons.types import (
    BlockGeometry,
    BlockTopology,
    Brick3DGeometry,
    Coil3DGeometry,
    CoilGeometry,
    DesignVariableResult,
    Geometry,
    GraphPlot,
    HarmonicCoil,
    ObjectiveResult,
    Plot2D,
    Plot3D,
    WedgeGeometry,
)


[docs] class EddyCurrentsData: """Data for eddy currents calculations for every time_step""" def __init__(self, xroot) -> None: self._xroot = xroot
[docs] def get_time_step_count(self) -> int: """ Get the number of <time_step> elements in the XML file. Returns ------- int The number of <time_step> elements found. """ time_steps = self._xroot.findall(".//time_step") return len(time_steps)
[docs] def get_info_on_timestep(self, time_step_index: int) -> dict: """ Parse the information for the specified <time_step> to extract step_number and absolute_time. Parameters ---------- time_step_index : int The index of the <time_step> element to parse. Returns ------- dict A dictionary containing: - "step_number" (int): The time step number. - "absolute_time" (float): The absolute time for the present time step. """ # Find all <time_step> elements time_steps = self._xroot.findall(".//time_step") # Handle the case where no <time_step> elements are found if time_steps is None or len(time_steps) == 0: raise ValueError("No <time_step> elements found in the XML file.") # Check if the time_step_index is within range if time_step_index >= len(time_steps) or time_step_index < 0: raise IndexError("time_step_index is out of range.") # Get the text content of the specified <time_step> element time_step_elem = time_steps[time_step_index] line = time_step_elem.text.strip() if time_step_elem.text else "" # Split the line by whitespace and parse the values try: _, step_number, absolute_time = line.split() step_number = int(step_number) absolute_time = float(absolute_time) except ValueError as err: raise ValueError("The <time_step> line format is incorrect.") from err return {"step_number": step_number, "absolute_time": absolute_time}
[docs] def get_elements_info(self) -> dict: """ Parse the <element_data> line to extract tot_ele and max_nodes. Returns ------- dict A dictionary containing: - "tot_ele" (int): The total number of elements. - "max_nodes" (int): The maximum number of nodes. """ # Find the <element_data> element element_data_elem = self._xroot.find(".//element_data") # Check if <element_data> element exists if element_data_elem is None or not element_data_elem.text: raise ValueError("<element_data> element is missing or empty.") # Extract text, strip any surrounding whitespace, and split by whitespace try: _, _, tot_ele, max_nodes = element_data_elem.text.strip().split() except ValueError as err: raise ValueError( "<element_data> is missing or has the wrong format." ) from err return {"tot_ele": int(tot_ele), "max_nodes": int(max_nodes)}
[docs] def get_elements(self) -> pd.DataFrame: """ Parse the <elements> data to extract details for each element. Returns ------- pd.DataFrame A DataFrame with columns: - ele_num: Element number - hmo_ele: HMO element - ele_type: Element type - ele_collector: Element collector - nodes: List of node numbers (up to max_nodes) """ # Find the <elements> element elements_elem = self._xroot.find(".//elements") # Handle the case where no <time_step> elements are found if elements_elem is None: raise ValueError("No <elements> found in the XML file.") # Check if <elements> element exists if elements_elem is None or not elements_elem.text: raise ValueError("<elements> element is missing or empty.") # Extract text content, strip whitespace, and split by lines lines = elements_elem.text.strip().split("\n") # Initialize a list to store element data element_data = [] # Process each line in the <elements> data for line in lines[1:]: # Split line by whitespace columns = line.split() print(columns) # Parse the main attributes try: ele_num = int(columns[0]) hmo_ele = int(columns[1]) ele_type = int(columns[2]) ele_collector = int(columns[3]) except (ValueError, IndexError) as e: raise ValueError( f"Wrong formatting of the <elements> line: {line}" ) from e # The remaining numbers are nodes (with padding zeros to max_nodes) nodes = list(map(int, columns[4:])) # Append to the data list element_data.append( { "ele_num": ele_num, "hmo_ele": hmo_ele, "ele_type": ele_type, "ele_collector": ele_collector, "nodes": nodes, } ) # Convert the list of dictionaries to a DataFrame return pd.DataFrame(element_data)
[docs] def get_eddy_currents_on_timestep(self, time_step_index=0) -> pd.DataFrame: """ Parse the eddy currents data from a specific time_step in the post.xml file. Parameters ---------- time_step_index : int, optional The index of the <time_step> element from which to retrieve the <eddy_currents_data>. Defaults to 0, which is the first time_step. Returns ------- pd.DataFrame A DataFrame with the eddy currents data. The columns are - node: node number - elem: element number - jx: x component of the current density - jy: y component of the current density - jz: z component of the current density - j2/sigma: squared current density divided by the conductivity """ # Find all <time_step> elements time_steps = self._xroot.findall(".//time_step") # Check if the time_step_index is within range if time_step_index >= len(time_steps) or time_step_index < 0: raise IndexError("time_step_index is out of range.") # Get the <eddy_currents_data> element for the specified time_step eddy_currents_elem = time_steps[time_step_index].find(".//eddy_currents_data") if eddy_currents_elem is None or not eddy_currents_elem.text: return ( pd.DataFrame() ) # Return an empty DataFrame if the element is missing or empty # Extract and parse the text content data = eddy_currents_elem.text.strip() lines = data.split("\n") data_lines = lines[1:] # Skip the header line # Parse the data into rows rows = [line.split() for line in data_lines] df = pd.DataFrame(rows, columns=["node", "elem", "jx", "jy", "jz", "j2/sigma"]) # Convert columns to numeric and handle NaN df = df.apply(pd.to_numeric, errors="coerce") return df
[docs] def get_eddy_currents_all_timesteps(self) -> pd.DataFrame: """ Retrieve eddy currents data for all time steps and store it in a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing eddy currents data for all time steps. The columns are: - time_step: index of the time step - node: node number - elem: element number - jx: x component of the current density - jy: y component of the current density - jz: z component of the current density - j2/sigma: squared current density divided by the conductivity If no data exists, returns an empty DataFrame with the above columns. """ # Find all <time_step> elements to get the total number of time steps time_steps = self._xroot.findall(".//time_step") time_steps_num = len(time_steps) # Initialize an empty list to store DataFrames for each time step all_data = [] for i in range(time_steps_num): # Get the eddy currents data for the current time step df = self.get_eddy_currents_on_timestep(time_step_index=i) if not df.empty: # Add the time_step index as a new column df["time_step"] = i # Append the data for the current time step all_data.append(df) if all_data: # Concatenate all DataFrames into one eddy_currents_df = pd.concat(all_data, ignore_index=True) else: # Return an empty DataFrame with the appropriate column names eddy_currents_df = pd.DataFrame( columns=["time_step", "node", "elem", "jx", "jy", "jz", "j2/sigma"] ) return eddy_currents_df
[docs] def get_potential_on_timestep(self, time_step_index: int) -> pd.DataFrame: """ Parse the potential data from a specific time_step in the post.xml file. Parameters ---------- time_step_index : int The index of the <time_step> element from which to retrieve the <potential_data>. Returns ------- pd.DataFrame A DataFrame with the potential data. The columns are - node: node number - az: potential value - norm_deriv_az: normalized derivative of az """ # Find all <time_step> elements time_steps = self._xroot.findall(".//time_step") # Check if the time_step_index is within range if time_step_index >= len(time_steps) or time_step_index < 0: raise IndexError("time_step_index is out of range.") # Get the <potential_data> element for the specified time_step potential_data_elem = time_steps[time_step_index].find(".//potential_data") if potential_data_elem is None or not potential_data_elem.text: return ( pd.DataFrame() ) # Return an empty DataFrame if the element is missing or empty # Extract and parse the text content data = potential_data_elem.text.strip() lines = data.split("\n") data_lines = lines[1:] # Skip the header line # Parse the data into rows rows = [line.split() for line in data_lines] df = pd.DataFrame(rows, columns=["node", "az", "norm_deriv_az"]) # Convert columns to numeric and handle NaN df = df.apply(pd.to_numeric, errors="coerce") return df
[docs] def get_potential_all_timesteps(self) -> pd.DataFrame: """ Retrieve potential data for all time steps and store it in a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing potential data for all time steps. The columns are: - time_step: index of the time step - node: node number - az: potential value - norm_deriv_az: normalized derivative of az If no data exists, returns an empty DataFrame with the above columns. """ # Find all <time_step> elements to get the total number of time steps time_steps = self._xroot.findall(".//time_step") time_steps_num = len(time_steps) # Initialize an empty list to store DataFrames for each time step all_data = [] for i in range(time_steps_num): # Get the potential data for the current time step df = self.get_potential_on_timestep(time_step_index=i) if not df.empty: # Add the time_step index as a new column df["time_step"] = i # Append the data for the current time step all_data.append(df) if all_data: # Concatenate all DataFrames into one potential_df = pd.concat(all_data, ignore_index=True) else: # Return an empty DataFrame with the appropriate column names potential_df = pd.DataFrame( columns=["time_step", "node", "az", "norm_deriv_az"] ) return potential_df
[docs] def get_mesh_elements(self) -> Optional[pd.DataFrame]: """ Extract mesh elements from the XML root element and return them as a pandas DataFrame. Returns ------- Optional[pd.DataFrame] A DataFrame with mesh elements. Returns None if the "elements" tag is not found. """ # Check if the "elements" tag is present if eData := self._xroot.find(".//meshGeom/elements"): results = [] # Iterate over "fe" elements within "elements" for elem in eData.findall("fe"): # Collect element data, including 'id' and 'cnt' attributes, and element indices data = { "id": int(elem.attrib["id"]), "cnt": int(elem.attrib["cnt"]), } # Add the attributes a, b, c, ..., h for x in string.ascii_lowercase[: data["cnt"]]: data[x] = int(elem.attrib[x]) results.append(data) # Convert the results to a pandas DataFrame and return return pd.DataFrame(results) else: # Return None if "elements" is not found return None
[docs] def get_nodal_coords(self) -> Optional[np.ndarray]: """ Extract node data from the XML root element and return it as a NumPy array. Returns ------- Optional[np.ndarray] A NumPy array of node data. Returns None if the "nodes" tag is not found. """ # Locate the "nodes" element nData = self._xroot.find(".//meshGeom/nodes") # Raise an exception if "nodes" element is missing if nData is None: raise Exception("Error in meshGeometry: Nodes missing") # Parse node data into a DataFrame dicts = [x.attrib for x in nData.findall("p")] df = pd.DataFrame(dicts) # Convert columns to numeric where possible for c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") # Convert the DataFrame to a NumPy array and return nodes = df.to_numpy() return nodes
[docs] def get_magnetic_induction_on_timestep(self, time_step_index: int) -> pd.DataFrame: """ Parse the magnetic induction data from a specific time_step in the post.xml file. Parameters ---------- time_step_index : int The index of the <time_step> element from which to retrieve the <magnetic_induction_data>. Returns ------- pd.DataFrame A DataFrame with the magnetic induction data. The columns are - node: node number - elem: element number - Bx: x component of the magnetic induction - By: y component of the magnetic induction - Bz: z component of the magnetic induction """ # Find all <time_step> elements time_steps = self._xroot.findall(".//time_step") # Check if the time_step_index is within range if time_step_index >= len(time_steps) or time_step_index < 0: raise IndexError("time_step_index is out of range.") # Get the <magnetic_induction_data> element for the specified time_step magnetic_induction_elem = time_steps[time_step_index].find( ".//magnetic_induction_data" ) if magnetic_induction_elem is None or not magnetic_induction_elem.text: return ( pd.DataFrame() ) # Return an empty DataFrame if the element is missing or empty # Extract and parse the text content data = magnetic_induction_elem.text.strip() lines = data.split("\n") data_lines = lines[1:] # Skip the header line # Parse the data into rows rows = [line.split() for line in data_lines] df = pd.DataFrame(rows, columns=["node", "elem", "Bx", "By", "Bz"]) # Convert columns to numeric and handle NaN df = df.apply(pd.to_numeric, errors="coerce") return df
[docs] def get_magnetic_field_on_timestep(self, time_step_index: int) -> pd.DataFrame: """ Parse the magnetic field data from a specific time_step in the post.xml file. Parameters ---------- time_step_index : int The index of the <time_step> element from which to retrieve the <magnetic_field_data>. Returns ------- pd.DataFrame A DataFrame with the magnetic field data. The columns are - node: node number - elem: element number - Hx: x component of the magnetic field - Hy: y component of the magnetic field - Hz: z component of the magnetic field """ # Find all <time_step> elements time_steps = self._xroot.findall(".//time_step") # Handle the case where no <time_step> elements are found if time_steps is None or len(time_steps) == 0: raise ValueError("No <time_step> elements found in the XML file.") # Check if the time_step_index is within range if time_step_index >= len(time_steps) or time_step_index < 0: raise IndexError("time_step_index is out of range.") # Get the <magnetic_field_data> element for the specified time_step magnetic_field_elem = time_steps[time_step_index].find(".//magnetic_field_data") if magnetic_field_elem is None or not magnetic_field_elem.text: return ( pd.DataFrame() ) # Return an empty DataFrame if the element is missing or empty # Extract and parse the text content data = magnetic_field_elem.text.strip() lines = data.split("\n") data_lines = lines[1:] # Skip the header line # Parse the data into rows rows = [line.split() for line in data_lines] df = pd.DataFrame(rows, columns=["node", "elem", "Hx", "Hy", "Hz"]) # Convert columns to numeric and handle NaN df = df.apply(pd.to_numeric, errors="coerce") return df
[docs] def get_magnetic_induction_all_timesteps(self) -> pd.DataFrame: """ Retrieve magnetic induction data for all time steps and store it in a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing magnetic induction data for all time steps. The columns are: - time_step: index of the time step - node: node number - elem: element number - Bx: x component of the magnetic induction - By: y component of the magnetic induction - Bz: z component of the magnetic induction If no data exists, returns an empty DataFrame with the above columns. """ # Find all <time_step> elements to get the total number of time steps time_steps = self._xroot.findall(".//time_step") time_steps_num = len(time_steps) # Initialize an empty list to store DataFrames for each time step all_data = [] for i in range(time_steps_num): # Retrieve magnetic induction data for the current time step df = self.get_magnetic_induction_on_timestep(time_step_index=i) if not df.empty: # Add a column for the time_step index df["time_step"] = i # Append the data for this time step all_data.append(df) if all_data: # Combine all DataFrames into one magnetic_induction_df = pd.concat(all_data, ignore_index=True) else: # Return an empty DataFrame with the appropriate columns magnetic_induction_df = pd.DataFrame( columns=["time_step", "node", "elem", "Bx", "By", "Bz"] ) return magnetic_induction_df
[docs] def get_magnetic_field_all_timesteps(self) -> pd.DataFrame: """ Retrieve magnetic field data for all time steps and store it in a pandas DataFrame. Returns ------- pd.DataFrame A DataFrame containing magnetic field data for all time steps. The columns are: - time_step: index of the time step - node: node number - elem: element number - Hx: x component of the magnetic field - Hy: y component of the magnetic field - Hz: z component of the magnetic field If no data exists, returns an empty DataFrame with the above columns. """ # Find all <time_step> elements to get the total number of time steps time_steps = self._xroot.findall(".//time_step") time_steps_num = len(time_steps) # Initialize an empty list to store DataFrames for each time step all_data = [] for i in range(time_steps_num): # Get the magnetic field data for the current time step df = self.get_magnetic_field_on_timestep(time_step_index=i) if not df.empty: # Add the time_step index as a new column df["time_step"] = i # Append the data for the current time step all_data.append(df) if all_data: # Concatenate all DataFrames into one magnetic_field_df = pd.concat(all_data, ignore_index=True) else: # Return an empty DataFrame with the appropriate column names magnetic_field_df = pd.DataFrame( columns=["time_step", "node", "elem", "Hx", "Hy", "Hz"] ) return magnetic_field_df
[docs] def get_iron_nodal_data_info(self) -> dict: """ Parse the <iron_nodal_data> line to extract tot_nodes, nodf, and dim. Returns ------- dict A dictionary containing: - "tot_nodes" (int): Total number of nodes. - "nodf" (int): Number of degrees of freedom. - "dim" (int): Dimensionality of the nodes. """ # Find the <iron_nodal_data> element iron_nodal_data_elem = self._xroot.find(".//iron_nodal_data") # Check if <iron_nodal_data> element exists if iron_nodal_data_elem is None or not iron_nodal_data_elem.text: raise ValueError("<iron_nodal_data> element is missing or empty.") # Extract text, strip any surrounding whitespace, and split by whitespace try: _, _, _, tot_nodes, nodf, dim = iron_nodal_data_elem.text.strip().split() except ValueError as err: raise ValueError( "<iron_nodal_data> is missing or has the wrong format." ) from err return {"tot_nodes": int(tot_nodes), "nodf": int(nodf), "dim": int(dim)}
[docs] def parse_nodal_coord_info(self) -> pd.DataFrame: """ Parse the <nodal_coord> data to extract details for each node. Returns ------- pd.DataFrame A DataFrame with columns: - node_num: Node number - hmo_node: HMO node - x: X coordinate - y: Y coordinate - z: Z coordinate - frame: Frame value - bound: Boundary condition - nodf: Degrees of freedom """ # Find the <nodal_coord> element nodal_coord_elem = self._xroot.find(".//nodal_coord") # Check if <nodal_coord> element exists if nodal_coord_elem is None or not nodal_coord_elem.text: raise ValueError("<nodal_coord> element is missing or empty.") # Extract text content, strip whitespace, and split by lines lines = nodal_coord_elem.text.strip().split("\n") # Initialize a list to store node data node_data = [] # Process each line in the <nodal_coord> data for line in lines[1:]: # Split line by whitespace columns = line.split() try: node_num = int(columns[0]) hmo_node = int(columns[1]) x = float(columns[2]) y = float(columns[3]) z = float(columns[4]) frame = int(columns[5]) bound = int(columns[6]) nodf = int(columns[7]) except (ValueError, IndexError) as e: raise ValueError( f"Wrong formatting of the <nodal_coord> line: {line}" ) from e # Append to the data list node_data.append( { "node_num": node_num, "hmo_node": hmo_node, "x": x, "y": y, "z": z, "frame": frame, "bound": bound, "nodf": nodf, } ) # Convert the list of dictionaries to a DataFrame return pd.DataFrame(node_data)
[docs] class EddyStepData: """ Data of an eddy time step """ def __init__(self, id: int, time: float): """ Initialize an EddyStepData object Parameters ---------- id : int index of the eddy time step time : float time at which the eddy time step is defined """ self._id = id self._time = time self._potentialData = pd.DataFrame() self._magneticInductionData = pd.DataFrame() self._eddyCurrentsData = pd.DataFrame() self._magneticFieldData = pd.DataFrame() self._meshData = pd.DataFrame() @property def id(self) -> int: """ Read-only - retrieve the index of the eddy time step Returns ------- int index of the eddy time step """ return self._id @property def time(self) -> float: """ Read-only - retrieve the time at which the eddy time step is defined Returns ------- float time at which the eddy time step is defined """ return self._time @property def meshData(self) -> pd.DataFrame: """ Read-only - Retrieve the iron mesh data for the eddy time step, which includes the potential data, magnetic induction data, eddy currents data, and magnetic field data for the coresponding eddy step. TODO: This should already be cleaned in the xml file Returns ------- pd.DataFrame DataFrame with the potential data """ if self._meshData.empty: ecd = self.eddyCurrentsData.sort_values(by="node") ecd = ecd.groupby("node", as_index=False).mean() pda = self.potentialData.sort_values(by="node") mfd = self.magneticFieldData.sort_values(by="node") mfd = mfd.groupby("node", as_index=False).mean() mid = self.magneticInductionData.sort_values(by="node") mid = mid.groupby("node", as_index=False).mean() # KD: Keep the nodes, drop the elems and concat all the data self._meshData = pd.concat([ecd, pda, mfd, mid], axis=1) self._meshData = self._meshData.loc[ :, ~self._meshData.columns.duplicated() ].copy() self._meshData = self._meshData.drop( columns=[col for col in self._meshData.columns if "elem" in col] ) return self._meshData @property def potentialData(self) -> pd.DataFrame: """ Read-only - retrieve the potential data for the eddy time step Returns ------- pd.DataFrame DataFrame with the potential data """ return self._potentialData @potentialData.setter def potentialData(self, value: pd.DataFrame): """ Write-only - set the potential data for the eddy time step Parameters ---------- value : pd.DataFrame DataFrame with the potential data """ self._potentialData = value @property def magneticInductionData(self) -> pd.DataFrame: """ Read-only - retrieve the magnetic induction data for the eddy time step Returns ------- pd.DataFrame DataFrame with the magnetic induction data """ return self._magneticInductionData @magneticInductionData.setter def magneticInductionData(self, value: pd.DataFrame): """ Write-only - set the magnetic induction data for the eddy time step Parameters ---------- value : pd.DataFrame DataFrame with the magnetic induction data """ self._magneticInductionData = value @property def eddyCurrentsData(self) -> pd.DataFrame: """ Read-only - retrieve the eddy currents data for the eddy time step Returns ------- pd.DataFrame DataFrame with the eddy currents data """ return self._eddyCurrentsData @eddyCurrentsData.setter def eddyCurrentsData(self, value: pd.DataFrame): """ Write-only - set the eddy currents data for the eddy time step Parameters ---------- value : pd.DataFrame DataFrame with the eddy currents data """ self._eddyCurrentsData = value @property def magneticFieldData(self) -> pd.DataFrame: """ Read-only - retrieve the magnetic field data for the eddy time step Returns ------- pd.DataFrame DataFrame with the magnetic field data """ return self._magneticFieldData @magneticFieldData.setter def magneticFieldData(self, value: pd.DataFrame): """ Write-only - set the magnetic field data for the eddy time step Parameters ---------- value : pd.DataFrame DataFrame with the magnetic field data """ self._magneticFieldData = value
[docs] class TransStepData: """Data of a transient step""" def __init__(self, id: int, name: str) -> None: self._id: int = id self._name: str = name self.coilData = pd.DataFrame() self.meshData = pd.DataFrame() self.matrixData = pd.DataFrame() # self.irisData = pd.DataFrame() self.coilData3D = pd.DataFrame() self.brickData3D = pd.DataFrame() self.meshData3D = pd.DataFrame() self.deviceGraphs: Dict[int, pd.DataFrame] = {} self.harmonicCoils: Dict[int, HarmonicCoil] = {} self.conductorForces: Optional[pd.DataFrame] = None self._eddyTimeSteps: Dict[int, EddyStepData] = {} @property def eddyTimeSteps(self) -> Dict[int, EddyStepData]: """ Read-only - retrieve the dictionary of eddy time steps associated with this transient step. Returns ------- Dict[int, EddyStepData] A dictionary where keys are the step indices and values are EddyStepData objects representing the data for each eddy_time_step. """ return self._eddyTimeSteps @property def id(self) -> int: """Read-only - retrieve the unique identifier of this transient step.""" return self._id @property def name(self) -> str: """Read-only - retrieve the name of this transient step.""" return self._name @property def eddy_steps_number(self) -> int: """Read-only - retrieve the number of eddy time steps associated with this transient step.""" return len(self._eddyTimeSteps)
[docs] @dataclass class CoilGeomDfs: conductors: pd.DataFrame strands: pd.DataFrame
[docs] @dataclass class MeshGeomDfs: nodes: pd.DataFrame elements: pd.DataFrame boundaries: pd.DataFrame
[docs] class OptData: """Data Of an optimization Step""" def __init__(self, id: int, name: str) -> None: self.id = id self.name: str = name self.transientGraphs: Dict[int, pd.DataFrame] = {} self.step: Dict[int, TransStepData] = {} self.designVariables: Dict[int, DesignVariableResult] = {} self.objectiveResults: Dict[int, ObjectiveResult] = {} self._coilGeometries: Dict[int, CoilGeometry] = {} self._coilGeometries3D: Dict[int, Coil3DGeometry] = {} self._brickGeometries3D: Dict[int, Brick3DGeometry] = {} self._wedgeGeometries3D: Dict[int, WedgeGeometry] = {} self._blockGeometries3D: dict[int, BlockGeometry] = {} self._meshGeometries: Optional[Geometry] = None self._meshGeometries3D: Optional[Geometry] = None self._coilGeomdf: CoilGeomDfs = CoilGeomDfs(pd.DataFrame(), pd.DataFrame()) self._coilGeom3ddf: pd.DataFrame = pd.DataFrame() self._brickGeom3ddf: pd.DataFrame = pd.DataFrame() self._topologydf: pd.DataFrame = pd.DataFrame() self._meshGeomdf: MeshGeomDfs = MeshGeomDfs( pd.DataFrame(), pd.DataFrame(), pd.DataFrame() ) self._meshGeom3ddf: MeshGeomDfs = MeshGeomDfs( pd.DataFrame(), pd.DataFrame(), pd.DataFrame() ) @property def blockTopologies(self) -> Dict[int, BlockTopology]: """ Get block topologies """ return { int(block.block_nr): BlockTopology.from_namedtuple(block) for block in self._topologydf.itertuples() } @property def coilGeometries(self) -> Dict[int, CoilGeometry]: if not self._coilGeometries: self._create_coils_from_df() return self._coilGeometries @coilGeometries.setter def coilGeometries(self, value: Dict[int, CoilGeometry]): self._coilGeometries = value @property def coilGeometries3D(self) -> Dict[int, Coil3DGeometry]: if not self._coilGeometries3D: self._create_coils3d_from_df() return self._coilGeometries3D @coilGeometries3D.setter def coilGeometries3D(self, value: Dict[int, Coil3DGeometry]): self._coilGeometries3D = value @property def brickGeometries3D(self) -> Dict[int, Brick3DGeometry]: if not self._brickGeometries3D: self._create_bricks_from_df() return self._brickGeometries3D @brickGeometries3D.setter def brickGeometries3D(self, value: Dict[int, Brick3DGeometry]): self._brickGeometries3D = value @property def wedgeGeometries3D(self) -> Dict[int, WedgeGeometry]: if not self._wedgeGeometries3D: self._blocks_to_wedges() return self._wedgeGeometries3D @wedgeGeometries3D.setter def wedgeGeometries3D(self, value: Dict[int, WedgeGeometry]): self._wedgeGeometries3D = value @property def blockGeometries3D(self) -> Dict[int, BlockGeometry]: return self._blockGeometries3D @blockGeometries3D.setter def blockGeometries3D(self, value: Dict[int, BlockGeometry]): self._blockGeometries3D = value @property def meshGeometries(self) -> Optional[Geometry]: if not self._meshGeometries: self._create_mesh_from_df() pass return self._meshGeometries @meshGeometries.setter def meshGeometries(self, value: Optional[Geometry]): self._meshGeometries = value @property def meshGeometries3D(self) -> Optional[Geometry]: if self._meshGeometries3D is None: self._create_mesh3d_from_df() pass return self._meshGeometries3D @meshGeometries3D.setter def meshGeometries3D(self, value: Optional[Geometry]): self._meshGeometries3D = value @property def transient_steps_number(self) -> int: """Read-only - retrieve the number of transient steps associated with this optimization step.""" return len(self.step) def _blocks_to_wedges(self) -> None: # From given block geometry, generate wedges if self._topologydf.empty: return # Iterate through blocks to establish the blockorder and nr of grouped blocks (by layerid and original blockid) block_order: dict[int, dict[int, list[int]]] = {} for _, row in self._topologydf.iterrows(): layer = int(row["layer_nr"]) block_orig = int(row["block_origin"]) block_nr = int(row["block_nr"]) if layer not in block_order: block_order[layer] = {} if block_orig not in block_order[layer]: block_order[layer][block_orig] = [] block_order[layer][block_orig].append(block_nr) # From the generated order extract the list of unique blocklists (each for generating one set of wedges) block_ids: dict[int, list[list[int]]] = {} for layer in block_order: max_len = max(len(blocks) for blocks in block_order[layer].values()) block_ids[layer] = [[] for _ in range(max_len)] for block_orig in block_order[layer]: for idx, block_nr in enumerate(block_order[layer][block_orig]): block_ids[layer][idx].append(block_nr) wedge_nr = 1 wedges: dict[int, WedgeGeometry] = {} for layer, block_list_list in block_ids.items(): for block_list in block_list_list: # endspacer wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, self.blockGeometries3D[block_list[0]].outer_surface, None, block_list[0], 0, ) wedge_nr += 1 for bl in range(1, len(block_list)): outer = self.blockGeometries3D[block_list[bl - 1]].inner_surface inner = self.blockGeometries3D[block_list[bl]].outer_surface wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, inner, outer, block_list[bl], block_list[bl - 1], ) wedge_nr += 1 # inner post wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, None, self.blockGeometries3D[block_list[-1]].inner_surface, 0, block_list[-1], ) wedge_nr += 1 self.wedgeGeometries3D = wedges def _create_mesh_from_df(self) -> None: if self._meshGeomdf.nodes.empty: return self._meshGeometries = self._meshdf_to_geom(self._meshGeomdf) def _create_mesh3d_from_df(self) -> None: if self._meshGeom3ddf.nodes.empty: return self._meshGeometries3D = self._meshdf_to_geom(self._meshGeom3ddf) def _meshdf_to_geom(self, df: MeshGeomDfs) -> Geometry: nodes = df.nodes.to_numpy()[:, 1:] elements = df.elements.to_numpy()[:, 2:] elements -= 1 # translate to 0 based index elements_list = elements.tolist() for nr_elem, lst in zip(df.elements["n_el"], elements_list): del lst[nr_elem:] # Resize lists to match number of elements boundaries = {} if not df.boundaries.empty: for id, grp in df.boundaries.groupby("boundary_id"): if grp.empty: continue boundaries[id] = grp.to_numpy()[:, 2:] return Geometry(nodes, elements_list, boundaries) def _create_bricks_from_df(self): if self._brickGeom3ddf.empty: return for idx, grp in self._brickGeom3ddf.groupby("brick_nr"): brick_nr = int(idx) if grp.empty: continue nodes = grp.to_numpy()[:, 2:].reshape((-1, 3)) geom = Geometry(nodes, None, None) geom.generate_elements_for_coil_nodes() self._brickGeometries3D[brick_nr] = Brick3DGeometry(brick_nr, geom) def _create_coils3d_from_df(self): if self._coilGeomdf.conductors.empty: return for idx, grp in self._coilGeom3ddf.groupby("conductor"): cond_nr = int(idx) if grp.empty: continue block_info = self._topologydf[ (self._topologydf.first_conductor <= cond_nr) & (self._topologydf.last_conductor >= cond_nr) ].iloc[0] block_nr = int(block_info.block_nr) layer_nr = int(block_info.layer_nr) nodes = grp.to_numpy()[:, 2:].reshape((-1, 3)) geom = Geometry(nodes, None, None) geom.generate_elements_for_coil_nodes() self._coilGeometries3D[cond_nr] = Coil3DGeometry( cond_nr, geom, block_nr, layer_nr ) def _create_coils_from_df(self): if self._coilGeomdf.conductors.empty: return cables = {} for _, cond in self._coilGeomdf.conductors.iterrows(): cable_nr = int(cond["conductor"]) geom = cond.to_numpy()[1:].reshape((4, 2)) block_info = self._topologydf[ (self._topologydf.first_conductor <= cond["conductor"]) & (self._topologydf.last_conductor >= cond["conductor"]) ] block_nr = block_info.block_nr layer_nr = block_info.layer_nr first_cond_strand = int( ( block_info.first_strand + (cond["conductor"] - block_info.first_conductor) * block_info.n_radial * block_info.n_azimuthal ).iloc[0] ) last_cond_strand = int( ( first_cond_strand + (block_info.n_radial * block_info.n_azimuthal) - 1 ).iloc[0] ) df_strand = self._coilGeomdf.strands strands = df_strand[ (df_strand["strand"] >= first_cond_strand) & (df_strand["strand"] <= last_cond_strand) ] strands_dict = { int(st["strand"]): st.to_numpy()[1:].reshape((4, 2)) for _, st in strands.iterrows() } cables[cable_nr] = CoilGeometry( cable_nr, block_nr, layer_nr, geom, strands_dict ) self._coilGeometries = cables
[docs] class RoxieOutputParser: """Roxie output parser class. Takes all different Roxie outputs, parses them, and provides a structured output of the results. """ def __init__(self, xml_file: str) -> None: from roxieapi.output.xml_parse import _XmlParser self.logger = logging.getLogger("RoxieOutputParser") self.optimizationGraphs: Dict[ int, pd.DataFrame ] = {} # Result values on optimization graphs, (id) self.opt: Dict[int, OptData] = {} self.plots2D: List[Plot2D] = [] # 2D Plots information for device self.plots3D: List[Plot3D] = [] # 3D Plots information for device self.graphs_device: List[GraphPlot] = [] # Graph information for device self.graphs_transient: List[ GraphPlot ] = [] # Plot information for transient plots self.graphs_optimization: List[ GraphPlot ] = [] # Plot information for optimization plots # General information self.roxie_version = "" self.roxie_githash = "" self.run_date = "" self.comment = "" # Parse the file, extract data _XmlParser.parse_xml(xml_file, self)
[docs] def find_eddystep( self, opt_step: int, trans_step: int, eddy_step: Optional[int] ) -> Optional[EddyStepData]: """ Find the eddy step data for a given optimization step and transient step :param opt_step: The optimization step number :param trans_step: The transient step number :param eddy_step: The eddy step number :return: The EddyStep object or None if not found """ if ( opt_step in self.opt and trans_step in self.opt[opt_step].step and eddy_step is not None and eddy_step in self.opt[opt_step].step[trans_step].eddyTimeSteps ): return self.opt[opt_step].step[trans_step].eddyTimeSteps[eddy_step] return None
[docs] def find_transstep(self, opt_step: int, trans_step: int) -> Optional[TransStepData]: """ Find the transient step data for a given optimization step and transient step :param opt_step: The optimization step number :param trans_step: The transient step number :return: The TransStepData object or None if not found """ if opt_step in self.opt and trans_step in self.opt[opt_step].step: return self.opt[opt_step].step[trans_step] return None
[docs] def find_optstep(self, opt_step) -> Optional[OptData]: """ Find the optimization step data for a given optimization step :param opt_step: The optimization step number :return: The OptData object or None if not found """ return self.opt.get(opt_step, None)
[docs] def get_harmonic_coil( self, coil_nr: int = 1, opt_step: int = 1, trans_step: int = 1, ) -> Optional[HarmonicCoil]: """Return the harmonic coil for given step and coil id, or None if not present :param coil_nr: Harmonic Coil Nr, defaults to 1 :param opt_step: The Optimization Step Nr, defaults to 1 :param trans_step: The Transient Step Nr, defaults to 1 :return: The Harmonic coil, or None """ if trans := self.find_transstep(opt_step, trans_step): return trans.harmonicCoils.get(coil_nr, None) return None
[docs] def get_conductor_forces( self, opt_step: int = 1, trans_step: int = 1 ) -> Optional[pd.DataFrame]: """Return Conductor forces for given Step, or None if not present :param opt_step: The Optimization step, defaults to 1 :param trans_step: Transient step, defaults to 1 :return: The Conductor forces as Dataframe """ if trans := self.find_transstep(opt_step, trans_step): return trans.conductorForces else: return None
[docs] def get_crosssection_plot(self, plot_nr: int = 1) -> Optional[Plot2D]: """Return the Crossection 2D plot with number i :param plot_nr: The plot_number, defaults to 1 :return: The Plot2D object, or None """ for pl in self.plots2D: if isinstance(pl, Plot2D) and pl.id == plot_nr: return pl return None
[docs] def get_3d_plot(self, plot_nr: int = 1) -> Optional[Plot3D]: """Return the 3D plot with number i :param plon_nr: The plot number, defaults to 1 :return: The Plot3D definition, or None """ for pl in self.plots3D: if isinstance(pl, Plot3D) and pl.id == plot_nr: return pl return None