import logging
import string
from dataclasses import dataclass
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
from roxieapi.commons.types import (
BlockGeometry,
BlockTopology,
Brick3DGeometry,
Coil3DGeometry,
CoilGeometry,
DesignVariableResult,
Geometry,
GraphPlot,
HarmonicCoil,
ObjectiveResult,
Plot2D,
Plot3D,
WedgeGeometry,
)
[docs]
class EddyCurrentsData:
"""Data for eddy currents calculations for every time_step"""
def __init__(self, xroot) -> None:
self._xroot = xroot
[docs]
def get_time_step_count(self) -> int:
"""
Get the number of <time_step> elements in the XML file.
Returns
-------
int
The number of <time_step> elements found.
"""
time_steps = self._xroot.findall(".//time_step")
return len(time_steps)
[docs]
def get_info_on_timestep(self, time_step_index: int) -> dict:
"""
Parse the information for the specified <time_step> to extract step_number and absolute_time.
Parameters
----------
time_step_index : int
The index of the <time_step> element to parse.
Returns
-------
dict
A dictionary containing:
- "step_number" (int): The time step number.
- "absolute_time" (float): The absolute time for the present time step.
"""
# Find all <time_step> elements
time_steps = self._xroot.findall(".//time_step")
# Handle the case where no <time_step> elements are found
if time_steps is None or len(time_steps) == 0:
raise ValueError("No <time_step> elements found in the XML file.")
# Check if the time_step_index is within range
if time_step_index >= len(time_steps) or time_step_index < 0:
raise IndexError("time_step_index is out of range.")
# Get the text content of the specified <time_step> element
time_step_elem = time_steps[time_step_index]
line = time_step_elem.text.strip() if time_step_elem.text else ""
# Split the line by whitespace and parse the values
try:
_, step_number, absolute_time = line.split()
step_number = int(step_number)
absolute_time = float(absolute_time)
except ValueError as err:
raise ValueError("The <time_step> line format is incorrect.") from err
return {"step_number": step_number, "absolute_time": absolute_time}
[docs]
def get_elements_info(self) -> dict:
"""
Parse the <element_data> line to extract tot_ele and max_nodes.
Returns
-------
dict
A dictionary containing:
- "tot_ele" (int): The total number of elements.
- "max_nodes" (int): The maximum number of nodes.
"""
# Find the <element_data> element
element_data_elem = self._xroot.find(".//element_data")
# Check if <element_data> element exists
if element_data_elem is None or not element_data_elem.text:
raise ValueError("<element_data> element is missing or empty.")
# Extract text, strip any surrounding whitespace, and split by whitespace
try:
_, _, tot_ele, max_nodes = element_data_elem.text.strip().split()
except ValueError as err:
raise ValueError(
"<element_data> is missing or has the wrong format."
) from err
return {"tot_ele": int(tot_ele), "max_nodes": int(max_nodes)}
[docs]
def get_elements(self) -> pd.DataFrame:
"""
Parse the <elements> data to extract details for each element.
Returns
-------
pd.DataFrame
A DataFrame with columns:
- ele_num: Element number
- hmo_ele: HMO element
- ele_type: Element type
- ele_collector: Element collector
- nodes: List of node numbers (up to max_nodes)
"""
# Find the <elements> element
elements_elem = self._xroot.find(".//elements")
# Handle the case where no <time_step> elements are found
if elements_elem is None:
raise ValueError("No <elements> found in the XML file.")
# Check if <elements> element exists
if elements_elem is None or not elements_elem.text:
raise ValueError("<elements> element is missing or empty.")
# Extract text content, strip whitespace, and split by lines
lines = elements_elem.text.strip().split("\n")
# Initialize a list to store element data
element_data = []
# Process each line in the <elements> data
for line in lines[1:]:
# Split line by whitespace
columns = line.split()
print(columns)
# Parse the main attributes
try:
ele_num = int(columns[0])
hmo_ele = int(columns[1])
ele_type = int(columns[2])
ele_collector = int(columns[3])
except (ValueError, IndexError) as e:
raise ValueError(
f"Wrong formatting of the <elements> line: {line}"
) from e
# The remaining numbers are nodes (with padding zeros to max_nodes)
nodes = list(map(int, columns[4:]))
# Append to the data list
element_data.append(
{
"ele_num": ele_num,
"hmo_ele": hmo_ele,
"ele_type": ele_type,
"ele_collector": ele_collector,
"nodes": nodes,
}
)
# Convert the list of dictionaries to a DataFrame
return pd.DataFrame(element_data)
[docs]
def get_eddy_currents_on_timestep(self, time_step_index=0) -> pd.DataFrame:
"""
Parse the eddy currents data from a specific time_step in the post.xml file.
Parameters
----------
time_step_index : int, optional
The index of the <time_step> element from which to retrieve the <eddy_currents_data>.
Defaults to 0, which is the first time_step.
Returns
-------
pd.DataFrame
A DataFrame with the eddy currents data. The columns are
- node: node number
- elem: element number
- jx: x component of the current density
- jy: y component of the current density
- jz: z component of the current density
- j2/sigma: squared current density divided by the conductivity
"""
# Find all <time_step> elements
time_steps = self._xroot.findall(".//time_step")
# Check if the time_step_index is within range
if time_step_index >= len(time_steps) or time_step_index < 0:
raise IndexError("time_step_index is out of range.")
# Get the <eddy_currents_data> element for the specified time_step
eddy_currents_elem = time_steps[time_step_index].find(".//eddy_currents_data")
if eddy_currents_elem is None or not eddy_currents_elem.text:
return (
pd.DataFrame()
) # Return an empty DataFrame if the element is missing or empty
# Extract and parse the text content
data = eddy_currents_elem.text.strip()
lines = data.split("\n")
data_lines = lines[1:] # Skip the header line
# Parse the data into rows
rows = [line.split() for line in data_lines]
df = pd.DataFrame(rows, columns=["node", "elem", "jx", "jy", "jz", "j2/sigma"])
# Convert columns to numeric and handle NaN
df = df.apply(pd.to_numeric, errors="coerce")
return df
[docs]
def get_eddy_currents_all_timesteps(self) -> pd.DataFrame:
"""
Retrieve eddy currents data for all time steps and store it in a pandas DataFrame.
Returns
-------
pd.DataFrame
A DataFrame containing eddy currents data for all time steps. The columns are:
- time_step: index of the time step
- node: node number
- elem: element number
- jx: x component of the current density
- jy: y component of the current density
- jz: z component of the current density
- j2/sigma: squared current density divided by the conductivity
If no data exists, returns an empty DataFrame with the above columns.
"""
# Find all <time_step> elements to get the total number of time steps
time_steps = self._xroot.findall(".//time_step")
time_steps_num = len(time_steps)
# Initialize an empty list to store DataFrames for each time step
all_data = []
for i in range(time_steps_num):
# Get the eddy currents data for the current time step
df = self.get_eddy_currents_on_timestep(time_step_index=i)
if not df.empty:
# Add the time_step index as a new column
df["time_step"] = i
# Append the data for the current time step
all_data.append(df)
if all_data:
# Concatenate all DataFrames into one
eddy_currents_df = pd.concat(all_data, ignore_index=True)
else:
# Return an empty DataFrame with the appropriate column names
eddy_currents_df = pd.DataFrame(
columns=["time_step", "node", "elem", "jx", "jy", "jz", "j2/sigma"]
)
return eddy_currents_df
[docs]
def get_potential_on_timestep(self, time_step_index: int) -> pd.DataFrame:
"""
Parse the potential data from a specific time_step in the post.xml file.
Parameters
----------
time_step_index : int
The index of the <time_step> element from which to retrieve the <potential_data>.
Returns
-------
pd.DataFrame
A DataFrame with the potential data. The columns are
- node: node number
- az: potential value
- norm_deriv_az: normalized derivative of az
"""
# Find all <time_step> elements
time_steps = self._xroot.findall(".//time_step")
# Check if the time_step_index is within range
if time_step_index >= len(time_steps) or time_step_index < 0:
raise IndexError("time_step_index is out of range.")
# Get the <potential_data> element for the specified time_step
potential_data_elem = time_steps[time_step_index].find(".//potential_data")
if potential_data_elem is None or not potential_data_elem.text:
return (
pd.DataFrame()
) # Return an empty DataFrame if the element is missing or empty
# Extract and parse the text content
data = potential_data_elem.text.strip()
lines = data.split("\n")
data_lines = lines[1:] # Skip the header line
# Parse the data into rows
rows = [line.split() for line in data_lines]
df = pd.DataFrame(rows, columns=["node", "az", "norm_deriv_az"])
# Convert columns to numeric and handle NaN
df = df.apply(pd.to_numeric, errors="coerce")
return df
[docs]
def get_potential_all_timesteps(self) -> pd.DataFrame:
"""
Retrieve potential data for all time steps and store it in a pandas DataFrame.
Returns
-------
pd.DataFrame
A DataFrame containing potential data for all time steps. The columns are:
- time_step: index of the time step
- node: node number
- az: potential value
- norm_deriv_az: normalized derivative of az
If no data exists, returns an empty DataFrame with the above columns.
"""
# Find all <time_step> elements to get the total number of time steps
time_steps = self._xroot.findall(".//time_step")
time_steps_num = len(time_steps)
# Initialize an empty list to store DataFrames for each time step
all_data = []
for i in range(time_steps_num):
# Get the potential data for the current time step
df = self.get_potential_on_timestep(time_step_index=i)
if not df.empty:
# Add the time_step index as a new column
df["time_step"] = i
# Append the data for the current time step
all_data.append(df)
if all_data:
# Concatenate all DataFrames into one
potential_df = pd.concat(all_data, ignore_index=True)
else:
# Return an empty DataFrame with the appropriate column names
potential_df = pd.DataFrame(
columns=["time_step", "node", "az", "norm_deriv_az"]
)
return potential_df
[docs]
def get_mesh_elements(self) -> Optional[pd.DataFrame]:
"""
Extract mesh elements from the XML root element and return them as a pandas DataFrame.
Returns
-------
Optional[pd.DataFrame]
A DataFrame with mesh elements. Returns None if the "elements" tag is not found.
"""
# Check if the "elements" tag is present
if eData := self._xroot.find(".//meshGeom/elements"):
results = []
# Iterate over "fe" elements within "elements"
for elem in eData.findall("fe"):
# Collect element data, including 'id' and 'cnt' attributes, and element indices
data = {
"id": int(elem.attrib["id"]),
"cnt": int(elem.attrib["cnt"]),
}
# Add the attributes a, b, c, ..., h
for x in string.ascii_lowercase[: data["cnt"]]:
data[x] = int(elem.attrib[x])
results.append(data)
# Convert the results to a pandas DataFrame and return
return pd.DataFrame(results)
else:
# Return None if "elements" is not found
return None
[docs]
def get_nodal_coords(self) -> Optional[np.ndarray]:
"""
Extract node data from the XML root element and return it as a NumPy array.
Returns
-------
Optional[np.ndarray]
A NumPy array of node data. Returns None if the "nodes" tag is not found.
"""
# Locate the "nodes" element
nData = self._xroot.find(".//meshGeom/nodes")
# Raise an exception if "nodes" element is missing
if nData is None:
raise Exception("Error in meshGeometry: Nodes missing")
# Parse node data into a DataFrame
dicts = [x.attrib for x in nData.findall("p")]
df = pd.DataFrame(dicts)
# Convert columns to numeric where possible
for c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
# Convert the DataFrame to a NumPy array and return
nodes = df.to_numpy()
return nodes
[docs]
def get_magnetic_induction_on_timestep(self, time_step_index: int) -> pd.DataFrame:
"""
Parse the magnetic induction data from a specific time_step in the post.xml file.
Parameters
----------
time_step_index : int
The index of the <time_step> element from which to retrieve the <magnetic_induction_data>.
Returns
-------
pd.DataFrame
A DataFrame with the magnetic induction data. The columns are
- node: node number
- elem: element number
- Bx: x component of the magnetic induction
- By: y component of the magnetic induction
- Bz: z component of the magnetic induction
"""
# Find all <time_step> elements
time_steps = self._xroot.findall(".//time_step")
# Check if the time_step_index is within range
if time_step_index >= len(time_steps) or time_step_index < 0:
raise IndexError("time_step_index is out of range.")
# Get the <magnetic_induction_data> element for the specified time_step
magnetic_induction_elem = time_steps[time_step_index].find(
".//magnetic_induction_data"
)
if magnetic_induction_elem is None or not magnetic_induction_elem.text:
return (
pd.DataFrame()
) # Return an empty DataFrame if the element is missing or empty
# Extract and parse the text content
data = magnetic_induction_elem.text.strip()
lines = data.split("\n")
data_lines = lines[1:] # Skip the header line
# Parse the data into rows
rows = [line.split() for line in data_lines]
df = pd.DataFrame(rows, columns=["node", "elem", "Bx", "By", "Bz"])
# Convert columns to numeric and handle NaN
df = df.apply(pd.to_numeric, errors="coerce")
return df
[docs]
def get_magnetic_field_on_timestep(self, time_step_index: int) -> pd.DataFrame:
"""
Parse the magnetic field data from a specific time_step in the post.xml file.
Parameters
----------
time_step_index : int
The index of the <time_step> element from which to retrieve the <magnetic_field_data>.
Returns
-------
pd.DataFrame
A DataFrame with the magnetic field data. The columns are
- node: node number
- elem: element number
- Hx: x component of the magnetic field
- Hy: y component of the magnetic field
- Hz: z component of the magnetic field
"""
# Find all <time_step> elements
time_steps = self._xroot.findall(".//time_step")
# Handle the case where no <time_step> elements are found
if time_steps is None or len(time_steps) == 0:
raise ValueError("No <time_step> elements found in the XML file.")
# Check if the time_step_index is within range
if time_step_index >= len(time_steps) or time_step_index < 0:
raise IndexError("time_step_index is out of range.")
# Get the <magnetic_field_data> element for the specified time_step
magnetic_field_elem = time_steps[time_step_index].find(".//magnetic_field_data")
if magnetic_field_elem is None or not magnetic_field_elem.text:
return (
pd.DataFrame()
) # Return an empty DataFrame if the element is missing or empty
# Extract and parse the text content
data = magnetic_field_elem.text.strip()
lines = data.split("\n")
data_lines = lines[1:] # Skip the header line
# Parse the data into rows
rows = [line.split() for line in data_lines]
df = pd.DataFrame(rows, columns=["node", "elem", "Hx", "Hy", "Hz"])
# Convert columns to numeric and handle NaN
df = df.apply(pd.to_numeric, errors="coerce")
return df
[docs]
def get_magnetic_induction_all_timesteps(self) -> pd.DataFrame:
"""
Retrieve magnetic induction data for all time steps and store it in a pandas DataFrame.
Returns
-------
pd.DataFrame
A DataFrame containing magnetic induction data for all time steps. The columns are:
- time_step: index of the time step
- node: node number
- elem: element number
- Bx: x component of the magnetic induction
- By: y component of the magnetic induction
- Bz: z component of the magnetic induction
If no data exists, returns an empty DataFrame with the above columns.
"""
# Find all <time_step> elements to get the total number of time steps
time_steps = self._xroot.findall(".//time_step")
time_steps_num = len(time_steps)
# Initialize an empty list to store DataFrames for each time step
all_data = []
for i in range(time_steps_num):
# Retrieve magnetic induction data for the current time step
df = self.get_magnetic_induction_on_timestep(time_step_index=i)
if not df.empty:
# Add a column for the time_step index
df["time_step"] = i
# Append the data for this time step
all_data.append(df)
if all_data:
# Combine all DataFrames into one
magnetic_induction_df = pd.concat(all_data, ignore_index=True)
else:
# Return an empty DataFrame with the appropriate columns
magnetic_induction_df = pd.DataFrame(
columns=["time_step", "node", "elem", "Bx", "By", "Bz"]
)
return magnetic_induction_df
[docs]
def get_magnetic_field_all_timesteps(self) -> pd.DataFrame:
"""
Retrieve magnetic field data for all time steps and store it in a pandas DataFrame.
Returns
-------
pd.DataFrame
A DataFrame containing magnetic field data for all time steps. The columns are:
- time_step: index of the time step
- node: node number
- elem: element number
- Hx: x component of the magnetic field
- Hy: y component of the magnetic field
- Hz: z component of the magnetic field
If no data exists, returns an empty DataFrame with the above columns.
"""
# Find all <time_step> elements to get the total number of time steps
time_steps = self._xroot.findall(".//time_step")
time_steps_num = len(time_steps)
# Initialize an empty list to store DataFrames for each time step
all_data = []
for i in range(time_steps_num):
# Get the magnetic field data for the current time step
df = self.get_magnetic_field_on_timestep(time_step_index=i)
if not df.empty:
# Add the time_step index as a new column
df["time_step"] = i
# Append the data for the current time step
all_data.append(df)
if all_data:
# Concatenate all DataFrames into one
magnetic_field_df = pd.concat(all_data, ignore_index=True)
else:
# Return an empty DataFrame with the appropriate column names
magnetic_field_df = pd.DataFrame(
columns=["time_step", "node", "elem", "Hx", "Hy", "Hz"]
)
return magnetic_field_df
[docs]
def get_iron_nodal_data_info(self) -> dict:
"""
Parse the <iron_nodal_data> line to extract tot_nodes, nodf, and dim.
Returns
-------
dict
A dictionary containing:
- "tot_nodes" (int): Total number of nodes.
- "nodf" (int): Number of degrees of freedom.
- "dim" (int): Dimensionality of the nodes.
"""
# Find the <iron_nodal_data> element
iron_nodal_data_elem = self._xroot.find(".//iron_nodal_data")
# Check if <iron_nodal_data> element exists
if iron_nodal_data_elem is None or not iron_nodal_data_elem.text:
raise ValueError("<iron_nodal_data> element is missing or empty.")
# Extract text, strip any surrounding whitespace, and split by whitespace
try:
_, _, _, tot_nodes, nodf, dim = iron_nodal_data_elem.text.strip().split()
except ValueError as err:
raise ValueError(
"<iron_nodal_data> is missing or has the wrong format."
) from err
return {"tot_nodes": int(tot_nodes), "nodf": int(nodf), "dim": int(dim)}
[docs]
def parse_nodal_coord_info(self) -> pd.DataFrame:
"""
Parse the <nodal_coord> data to extract details for each node.
Returns
-------
pd.DataFrame
A DataFrame with columns:
- node_num: Node number
- hmo_node: HMO node
- x: X coordinate
- y: Y coordinate
- z: Z coordinate
- frame: Frame value
- bound: Boundary condition
- nodf: Degrees of freedom
"""
# Find the <nodal_coord> element
nodal_coord_elem = self._xroot.find(".//nodal_coord")
# Check if <nodal_coord> element exists
if nodal_coord_elem is None or not nodal_coord_elem.text:
raise ValueError("<nodal_coord> element is missing or empty.")
# Extract text content, strip whitespace, and split by lines
lines = nodal_coord_elem.text.strip().split("\n")
# Initialize a list to store node data
node_data = []
# Process each line in the <nodal_coord> data
for line in lines[1:]:
# Split line by whitespace
columns = line.split()
try:
node_num = int(columns[0])
hmo_node = int(columns[1])
x = float(columns[2])
y = float(columns[3])
z = float(columns[4])
frame = int(columns[5])
bound = int(columns[6])
nodf = int(columns[7])
except (ValueError, IndexError) as e:
raise ValueError(
f"Wrong formatting of the <nodal_coord> line: {line}"
) from e
# Append to the data list
node_data.append(
{
"node_num": node_num,
"hmo_node": hmo_node,
"x": x,
"y": y,
"z": z,
"frame": frame,
"bound": bound,
"nodf": nodf,
}
)
# Convert the list of dictionaries to a DataFrame
return pd.DataFrame(node_data)
[docs]
class EddyStepData:
"""
Data of an eddy time step
"""
def __init__(self, id: int, time: float):
"""
Initialize an EddyStepData object
Parameters
----------
id : int
index of the eddy time step
time : float
time at which the eddy time step is defined
"""
self._id = id
self._time = time
self._potentialData = pd.DataFrame()
self._magneticInductionData = pd.DataFrame()
self._eddyCurrentsData = pd.DataFrame()
self._magneticFieldData = pd.DataFrame()
self._meshData = pd.DataFrame()
@property
def id(self) -> int:
"""
Read-only - retrieve the index of the eddy time step
Returns
-------
int
index of the eddy time step
"""
return self._id
@property
def time(self) -> float:
"""
Read-only - retrieve the time at which the eddy time step is defined
Returns
-------
float
time at which the eddy time step is defined
"""
return self._time
@property
def meshData(self) -> pd.DataFrame:
"""
Read-only - Retrieve the iron mesh data for the eddy time step,
which includes the potential data, magnetic induction data,
eddy currents data, and magnetic field data for the coresponding eddy step.
TODO: This should already be cleaned in the xml file
Returns
-------
pd.DataFrame
DataFrame with the potential data
"""
if self._meshData.empty:
ecd = self.eddyCurrentsData.sort_values(by="node")
ecd = ecd.groupby("node", as_index=False).mean()
pda = self.potentialData.sort_values(by="node")
mfd = self.magneticFieldData.sort_values(by="node")
mfd = mfd.groupby("node", as_index=False).mean()
mid = self.magneticInductionData.sort_values(by="node")
mid = mid.groupby("node", as_index=False).mean()
# KD: Keep the nodes, drop the elems and concat all the data
self._meshData = pd.concat([ecd, pda, mfd, mid], axis=1)
self._meshData = self._meshData.loc[
:, ~self._meshData.columns.duplicated()
].copy()
self._meshData = self._meshData.drop(
columns=[col for col in self._meshData.columns if "elem" in col]
)
return self._meshData
@property
def potentialData(self) -> pd.DataFrame:
"""
Read-only - retrieve the potential data for the eddy time step
Returns
-------
pd.DataFrame
DataFrame with the potential data
"""
return self._potentialData
@potentialData.setter
def potentialData(self, value: pd.DataFrame):
"""
Write-only - set the potential data for the eddy time step
Parameters
----------
value : pd.DataFrame
DataFrame with the potential data
"""
self._potentialData = value
@property
def magneticInductionData(self) -> pd.DataFrame:
"""
Read-only - retrieve the magnetic induction data for the eddy time step
Returns
-------
pd.DataFrame
DataFrame with the magnetic induction data
"""
return self._magneticInductionData
@magneticInductionData.setter
def magneticInductionData(self, value: pd.DataFrame):
"""
Write-only - set the magnetic induction data for the eddy time step
Parameters
----------
value : pd.DataFrame
DataFrame with the magnetic induction data
"""
self._magneticInductionData = value
@property
def eddyCurrentsData(self) -> pd.DataFrame:
"""
Read-only - retrieve the eddy currents data for the eddy time step
Returns
-------
pd.DataFrame
DataFrame with the eddy currents data
"""
return self._eddyCurrentsData
@eddyCurrentsData.setter
def eddyCurrentsData(self, value: pd.DataFrame):
"""
Write-only - set the eddy currents data for the eddy time step
Parameters
----------
value : pd.DataFrame
DataFrame with the eddy currents data
"""
self._eddyCurrentsData = value
@property
def magneticFieldData(self) -> pd.DataFrame:
"""
Read-only - retrieve the magnetic field data for the eddy time step
Returns
-------
pd.DataFrame
DataFrame with the magnetic field data
"""
return self._magneticFieldData
@magneticFieldData.setter
def magneticFieldData(self, value: pd.DataFrame):
"""
Write-only - set the magnetic field data for the eddy time step
Parameters
----------
value : pd.DataFrame
DataFrame with the magnetic field data
"""
self._magneticFieldData = value
[docs]
class TransStepData:
"""Data of a transient step"""
def __init__(self, id: int, name: str) -> None:
self._id: int = id
self._name: str = name
self.coilData = pd.DataFrame()
self.meshData = pd.DataFrame()
self.matrixData = pd.DataFrame()
# self.irisData = pd.DataFrame()
self.coilData3D = pd.DataFrame()
self.brickData3D = pd.DataFrame()
self.meshData3D = pd.DataFrame()
self.deviceGraphs: Dict[int, pd.DataFrame] = {}
self.harmonicCoils: Dict[int, HarmonicCoil] = {}
self.conductorForces: Optional[pd.DataFrame] = None
self._eddyTimeSteps: Dict[int, EddyStepData] = {}
@property
def eddyTimeSteps(self) -> Dict[int, EddyStepData]:
"""
Read-only - retrieve the dictionary of eddy time steps associated with this transient step.
Returns
-------
Dict[int, EddyStepData]
A dictionary where keys are the step indices and values are EddyStepData
objects representing the data for each eddy_time_step.
"""
return self._eddyTimeSteps
@property
def id(self) -> int:
"""Read-only - retrieve the unique identifier of this transient step."""
return self._id
@property
def name(self) -> str:
"""Read-only - retrieve the name of this transient step."""
return self._name
@property
def eddy_steps_number(self) -> int:
"""Read-only - retrieve the number of eddy time steps associated with this transient step."""
return len(self._eddyTimeSteps)
[docs]
@dataclass
class CoilGeomDfs:
conductors: pd.DataFrame
strands: pd.DataFrame
[docs]
@dataclass
class MeshGeomDfs:
nodes: pd.DataFrame
elements: pd.DataFrame
boundaries: pd.DataFrame
[docs]
class OptData:
"""Data Of an optimization Step"""
def __init__(self, id: int, name: str) -> None:
self.id = id
self.name: str = name
self.transientGraphs: Dict[int, pd.DataFrame] = {}
self.step: Dict[int, TransStepData] = {}
self.designVariables: Dict[int, DesignVariableResult] = {}
self.objectiveResults: Dict[int, ObjectiveResult] = {}
self._coilGeometries: Dict[int, CoilGeometry] = {}
self._coilGeometries3D: Dict[int, Coil3DGeometry] = {}
self._brickGeometries3D: Dict[int, Brick3DGeometry] = {}
self._wedgeGeometries3D: Dict[int, WedgeGeometry] = {}
self._blockGeometries3D: dict[int, BlockGeometry] = {}
self._meshGeometries: Optional[Geometry] = None
self._meshGeometries3D: Optional[Geometry] = None
self._coilGeomdf: CoilGeomDfs = CoilGeomDfs(pd.DataFrame(), pd.DataFrame())
self._coilGeom3ddf: pd.DataFrame = pd.DataFrame()
self._brickGeom3ddf: pd.DataFrame = pd.DataFrame()
self._topologydf: pd.DataFrame = pd.DataFrame()
self._meshGeomdf: MeshGeomDfs = MeshGeomDfs(
pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
)
self._meshGeom3ddf: MeshGeomDfs = MeshGeomDfs(
pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
)
@property
def blockTopologies(self) -> Dict[int, BlockTopology]:
"""
Get block topologies
"""
return {
int(block.block_nr): BlockTopology.from_namedtuple(block)
for block in self._topologydf.itertuples()
}
@property
def coilGeometries(self) -> Dict[int, CoilGeometry]:
if not self._coilGeometries:
self._create_coils_from_df()
return self._coilGeometries
@coilGeometries.setter
def coilGeometries(self, value: Dict[int, CoilGeometry]):
self._coilGeometries = value
@property
def coilGeometries3D(self) -> Dict[int, Coil3DGeometry]:
if not self._coilGeometries3D:
self._create_coils3d_from_df()
return self._coilGeometries3D
@coilGeometries3D.setter
def coilGeometries3D(self, value: Dict[int, Coil3DGeometry]):
self._coilGeometries3D = value
@property
def brickGeometries3D(self) -> Dict[int, Brick3DGeometry]:
if not self._brickGeometries3D:
self._create_bricks_from_df()
return self._brickGeometries3D
@brickGeometries3D.setter
def brickGeometries3D(self, value: Dict[int, Brick3DGeometry]):
self._brickGeometries3D = value
@property
def wedgeGeometries3D(self) -> Dict[int, WedgeGeometry]:
if not self._wedgeGeometries3D:
self._blocks_to_wedges()
return self._wedgeGeometries3D
@wedgeGeometries3D.setter
def wedgeGeometries3D(self, value: Dict[int, WedgeGeometry]):
self._wedgeGeometries3D = value
@property
def blockGeometries3D(self) -> Dict[int, BlockGeometry]:
return self._blockGeometries3D
@blockGeometries3D.setter
def blockGeometries3D(self, value: Dict[int, BlockGeometry]):
self._blockGeometries3D = value
@property
def meshGeometries(self) -> Optional[Geometry]:
if not self._meshGeometries:
self._create_mesh_from_df()
pass
return self._meshGeometries
@meshGeometries.setter
def meshGeometries(self, value: Optional[Geometry]):
self._meshGeometries = value
@property
def meshGeometries3D(self) -> Optional[Geometry]:
if self._meshGeometries3D is None:
self._create_mesh3d_from_df()
pass
return self._meshGeometries3D
@meshGeometries3D.setter
def meshGeometries3D(self, value: Optional[Geometry]):
self._meshGeometries3D = value
@property
def transient_steps_number(self) -> int:
"""Read-only - retrieve the number of transient steps associated with this optimization step."""
return len(self.step)
def _blocks_to_wedges(self) -> None:
# From given block geometry, generate wedges
if self._topologydf.empty:
return
# Iterate through blocks to establish the blockorder and nr of grouped blocks (by layerid and original blockid)
block_order: dict[int, dict[int, list[int]]] = {}
for _, row in self._topologydf.iterrows():
layer = int(row["layer_nr"])
block_orig = int(row["block_origin"])
block_nr = int(row["block_nr"])
if layer not in block_order:
block_order[layer] = {}
if block_orig not in block_order[layer]:
block_order[layer][block_orig] = []
block_order[layer][block_orig].append(block_nr)
# From the generated order extract the list of unique blocklists (each for generating one set of wedges)
block_ids: dict[int, list[list[int]]] = {}
for layer in block_order:
max_len = max(len(blocks) for blocks in block_order[layer].values())
block_ids[layer] = [[] for _ in range(max_len)]
for block_orig in block_order[layer]:
for idx, block_nr in enumerate(block_order[layer][block_orig]):
block_ids[layer][idx].append(block_nr)
wedge_nr = 1
wedges: dict[int, WedgeGeometry] = {}
for layer, block_list_list in block_ids.items():
for block_list in block_list_list:
# endspacer
wedges[wedge_nr] = WedgeGeometry(
layer,
wedge_nr,
self.blockGeometries3D[block_list[0]].outer_surface,
None,
block_list[0],
0,
)
wedge_nr += 1
for bl in range(1, len(block_list)):
outer = self.blockGeometries3D[block_list[bl - 1]].inner_surface
inner = self.blockGeometries3D[block_list[bl]].outer_surface
wedges[wedge_nr] = WedgeGeometry(
layer,
wedge_nr,
inner,
outer,
block_list[bl],
block_list[bl - 1],
)
wedge_nr += 1
# inner post
wedges[wedge_nr] = WedgeGeometry(
layer,
wedge_nr,
None,
self.blockGeometries3D[block_list[-1]].inner_surface,
0,
block_list[-1],
)
wedge_nr += 1
self.wedgeGeometries3D = wedges
def _create_mesh_from_df(self) -> None:
if self._meshGeomdf.nodes.empty:
return
self._meshGeometries = self._meshdf_to_geom(self._meshGeomdf)
def _create_mesh3d_from_df(self) -> None:
if self._meshGeom3ddf.nodes.empty:
return
self._meshGeometries3D = self._meshdf_to_geom(self._meshGeom3ddf)
def _meshdf_to_geom(self, df: MeshGeomDfs) -> Geometry:
nodes = df.nodes.to_numpy()[:, 1:]
elements = df.elements.to_numpy()[:, 2:]
elements -= 1 # translate to 0 based index
elements_list = elements.tolist()
for nr_elem, lst in zip(df.elements["n_el"], elements_list):
del lst[nr_elem:] # Resize lists to match number of elements
boundaries = {}
if not df.boundaries.empty:
for id, grp in df.boundaries.groupby("boundary_id"):
if grp.empty:
continue
boundaries[id] = grp.to_numpy()[:, 2:]
return Geometry(nodes, elements_list, boundaries)
def _create_bricks_from_df(self):
if self._brickGeom3ddf.empty:
return
for idx, grp in self._brickGeom3ddf.groupby("brick_nr"):
brick_nr = int(idx)
if grp.empty:
continue
nodes = grp.to_numpy()[:, 2:].reshape((-1, 3))
geom = Geometry(nodes, None, None)
geom.generate_elements_for_coil_nodes()
self._brickGeometries3D[brick_nr] = Brick3DGeometry(brick_nr, geom)
def _create_coils3d_from_df(self):
if self._coilGeomdf.conductors.empty:
return
for idx, grp in self._coilGeom3ddf.groupby("conductor"):
cond_nr = int(idx)
if grp.empty:
continue
block_info = self._topologydf[
(self._topologydf.first_conductor <= cond_nr)
& (self._topologydf.last_conductor >= cond_nr)
].iloc[0]
block_nr = int(block_info.block_nr)
layer_nr = int(block_info.layer_nr)
nodes = grp.to_numpy()[:, 2:].reshape((-1, 3))
geom = Geometry(nodes, None, None)
geom.generate_elements_for_coil_nodes()
self._coilGeometries3D[cond_nr] = Coil3DGeometry(
cond_nr, geom, block_nr, layer_nr
)
def _create_coils_from_df(self):
if self._coilGeomdf.conductors.empty:
return
cables = {}
for _, cond in self._coilGeomdf.conductors.iterrows():
cable_nr = int(cond["conductor"])
geom = cond.to_numpy()[1:].reshape((4, 2))
block_info = self._topologydf[
(self._topologydf.first_conductor <= cond["conductor"])
& (self._topologydf.last_conductor >= cond["conductor"])
]
block_nr = block_info.block_nr
layer_nr = block_info.layer_nr
first_cond_strand = int(
(
block_info.first_strand
+ (cond["conductor"] - block_info.first_conductor)
* block_info.n_radial
* block_info.n_azimuthal
).iloc[0]
)
last_cond_strand = int(
(
first_cond_strand
+ (block_info.n_radial * block_info.n_azimuthal)
- 1
).iloc[0]
)
df_strand = self._coilGeomdf.strands
strands = df_strand[
(df_strand["strand"] >= first_cond_strand)
& (df_strand["strand"] <= last_cond_strand)
]
strands_dict = {
int(st["strand"]): st.to_numpy()[1:].reshape((4, 2))
for _, st in strands.iterrows()
}
cables[cable_nr] = CoilGeometry(
cable_nr, block_nr, layer_nr, geom, strands_dict
)
self._coilGeometries = cables
[docs]
class RoxieOutputParser:
"""Roxie output parser class.
Takes all different Roxie outputs, parses them, and provides a structured output of the results.
"""
def __init__(self, xml_file: str) -> None:
from roxieapi.output.xml_parse import _XmlParser
self.logger = logging.getLogger("RoxieOutputParser")
self.optimizationGraphs: Dict[
int, pd.DataFrame
] = {} # Result values on optimization graphs, (id)
self.opt: Dict[int, OptData] = {}
self.plots2D: List[Plot2D] = [] # 2D Plots information for device
self.plots3D: List[Plot3D] = [] # 3D Plots information for device
self.graphs_device: List[GraphPlot] = [] # Graph information for device
self.graphs_transient: List[
GraphPlot
] = [] # Plot information for transient plots
self.graphs_optimization: List[
GraphPlot
] = [] # Plot information for optimization plots
# General information
self.roxie_version = ""
self.roxie_githash = ""
self.run_date = ""
self.comment = ""
# Parse the file, extract data
_XmlParser.parse_xml(xml_file, self)
[docs]
def find_eddystep(
self, opt_step: int, trans_step: int, eddy_step: Optional[int]
) -> Optional[EddyStepData]:
"""
Find the eddy step data for a given optimization step and transient step
:param opt_step: The optimization step number
:param trans_step: The transient step number
:param eddy_step: The eddy step number
:return: The EddyStep object or None if not found
"""
if (
opt_step in self.opt
and trans_step in self.opt[opt_step].step
and eddy_step is not None
and eddy_step in self.opt[opt_step].step[trans_step].eddyTimeSteps
):
return self.opt[opt_step].step[trans_step].eddyTimeSteps[eddy_step]
return None
[docs]
def find_transstep(self, opt_step: int, trans_step: int) -> Optional[TransStepData]:
"""
Find the transient step data for a given optimization step and transient step
:param opt_step: The optimization step number
:param trans_step: The transient step number
:return: The TransStepData object or None if not found
"""
if opt_step in self.opt and trans_step in self.opt[opt_step].step:
return self.opt[opt_step].step[trans_step]
return None
[docs]
def find_optstep(self, opt_step) -> Optional[OptData]:
"""
Find the optimization step data for a given optimization step
:param opt_step: The optimization step number
:return: The OptData object or None if not found
"""
return self.opt.get(opt_step, None)
[docs]
def get_harmonic_coil(
self,
coil_nr: int = 1,
opt_step: int = 1,
trans_step: int = 1,
) -> Optional[HarmonicCoil]:
"""Return the harmonic coil for given step and coil id, or None if not present
:param coil_nr: Harmonic Coil Nr, defaults to 1
:param opt_step: The Optimization Step Nr, defaults to 1
:param trans_step: The Transient Step Nr, defaults to 1
:return: The Harmonic coil, or None
"""
if trans := self.find_transstep(opt_step, trans_step):
return trans.harmonicCoils.get(coil_nr, None)
return None
[docs]
def get_conductor_forces(
self, opt_step: int = 1, trans_step: int = 1
) -> Optional[pd.DataFrame]:
"""Return Conductor forces for given Step, or None if not present
:param opt_step: The Optimization step, defaults to 1
:param trans_step: Transient step, defaults to 1
:return: The Conductor forces as Dataframe
"""
if trans := self.find_transstep(opt_step, trans_step):
return trans.conductorForces
else:
return None
[docs]
def get_crosssection_plot(self, plot_nr: int = 1) -> Optional[Plot2D]:
"""Return the Crossection 2D plot with number i
:param plot_nr: The plot_number, defaults to 1
:return: The Plot2D object, or None
"""
for pl in self.plots2D:
if isinstance(pl, Plot2D) and pl.id == plot_nr:
return pl
return None
[docs]
def get_3d_plot(self, plot_nr: int = 1) -> Optional[Plot3D]:
"""Return the 3D plot with number i
:param plon_nr: The plot number, defaults to 1
:return: The Plot3D definition, or None
"""
for pl in self.plots3D:
if isinstance(pl, Plot3D) and pl.id == plot_nr:
return pl
return None