Source code for roxieapi.output.parser

import logging
from dataclasses import dataclass
from typing import Dict, List, Optional

import pandas as pd

from roxieapi.commons.types import (
    BlockGeometry,
    BlockTopology,
    Brick3DGeometry,
    Coil3DGeometry,
    CoilGeometry,
    DesignVariableResult,
    Geometry,
    GraphPlot,
    HarmonicCoil,
    ObjectiveResult,
    Plot2D,
    Plot3D,
    WedgeGeometry,
)


[docs] class TransStepData: """Data of a transient step""" def __init__(self, id: int, name: str) -> None: self.id: int = id self.name: str = name self.coilData = pd.DataFrame() self.meshData = pd.DataFrame() self.matrixData = pd.DataFrame() # self.irisData = pd.DataFrame() self.coilData3D = pd.DataFrame() self.brickData3D = pd.DataFrame() self.meshData3D = pd.DataFrame() self.deviceGraphs: Dict[int, pd.DataFrame] = {} self.harmonicCoils: Dict[int, HarmonicCoil] = {} self.conductorForces: Optional[pd.DataFrame] = None
[docs] @dataclass class CoilGeomDfs: conductors: pd.DataFrame strands: pd.DataFrame
[docs] @dataclass class MeshGeomDfs: nodes: pd.DataFrame elements: pd.DataFrame boundaries: pd.DataFrame
[docs] class OptData: """Data Of an optimization Step""" def __init__(self, id: int, name: str) -> None: self.id = id self.name: str = name self.transientGraphs: Dict[int, pd.DataFrame] = {} self.step: Dict[int, TransStepData] = {} self.designVariables: Dict[int, DesignVariableResult] = {} self.objectiveResults: Dict[int, ObjectiveResult] = {} self._coilGeometries: Dict[int, CoilGeometry] = {} self._coilGeometries3D: Dict[int, Coil3DGeometry] = {} self._brickGeometries3D: Dict[int, Brick3DGeometry] = {} self._wedgeGeometries3D: Dict[int, WedgeGeometry] = {} self._blockGeometries3D: dict[int, BlockGeometry] = {} self._meshGeometries: Optional[Geometry] = None self._meshGeometries3D: Optional[Geometry] = None self._coilGeomdf: CoilGeomDfs = CoilGeomDfs(pd.DataFrame(), pd.DataFrame()) self._coilGeom3ddf: pd.DataFrame = pd.DataFrame() self._brickGeom3ddf: pd.DataFrame = pd.DataFrame() self._topologydf: pd.DataFrame = pd.DataFrame() self._meshGeomdf: MeshGeomDfs = MeshGeomDfs( pd.DataFrame(), pd.DataFrame(), pd.DataFrame() ) self._meshGeom3ddf: MeshGeomDfs = MeshGeomDfs( pd.DataFrame(), pd.DataFrame(), pd.DataFrame() ) @property def blockTopologies(self) -> Dict[int, BlockTopology]: """ Get block topologies """ return { int(block.block_nr): BlockTopology( block_nr=block.block_nr, block_orig=block.block_origin, layer_nr=block.layer_nr, first_conductor=block.first_conductor, last_conductor=block.last_conductor, n_radial=block.n_radial, n_azimuthal=block.n_azimuthal, first_strand=block.first_strand, last_strand=block.last_strand, ins_radial=block.ins_radial, ins_azimuthal=block.ins_azimuthal, ) for block in self._topologydf.itertuples() } @property def coilGeometries(self) -> Dict[int, CoilGeometry]: if not self._coilGeometries: self._create_coils_from_df() return self._coilGeometries @coilGeometries.setter def coilGeometries(self, value: Dict[int, CoilGeometry]): self._coilGeometries = value @property def coilGeometries3D(self) -> Dict[int, Coil3DGeometry]: if not self._coilGeometries3D: self._create_coils3d_from_df() return self._coilGeometries3D @coilGeometries3D.setter def coilGeometries3D(self, value: Dict[int, Coil3DGeometry]): self._coilGeometries3D = value @property def brickGeometries3D(self) -> Dict[int, Brick3DGeometry]: if not self._brickGeometries3D: self._create_bricks_from_df() return self._brickGeometries3D @brickGeometries3D.setter def brickGeometries3D(self, value: Dict[int, Brick3DGeometry]): self._brickGeometries3D = value @property def wedgeGeometries3D(self) -> Dict[int, WedgeGeometry]: if not self._wedgeGeometries3D: self._blocks_to_wedges() return self._wedgeGeometries3D @wedgeGeometries3D.setter def wedgeGeometries3D(self, value: Dict[int, WedgeGeometry]): self._wedgeGeometries3D = value @property def blockGeometries3D(self) -> Dict[int, BlockGeometry]: return self._blockGeometries3D @blockGeometries3D.setter def blockGeometries3D(self, value: Dict[int, BlockGeometry]): self._blockGeometries3D = value @property def meshGeometries(self) -> Optional[Geometry]: if not self._meshGeometries: self._create_mesh_from_df() pass return self._meshGeometries @meshGeometries.setter def meshGeometries(self, value: Optional[Geometry]): self._meshGeometries = value @property def meshGeometries3D(self) -> Optional[Geometry]: if self._meshGeometries3D is None: self._create_mesh3d_from_df() pass return self._meshGeometries3D @meshGeometries3D.setter def meshGeometries3D(self, value: Optional[Geometry]): self._meshGeometries3D = value def _blocks_to_wedges(self) -> None: # From given block geometry, generate wedges if self._topologydf.empty: return # Iterate through blocks to establish the blockorder and nr of grouped blocks (by layerid and original blockid) block_order: dict[int, dict[int, list[int]]] = {} for _, row in self._topologydf.iterrows(): layer = int(row["layer_nr"]) block_orig = int(row["block_origin"]) block_nr = int(row["block_nr"]) if layer not in block_order: block_order[layer] = {} if block_orig not in block_order[layer]: block_order[layer][block_orig] = [] block_order[layer][block_orig].append(block_nr) # From the generated order extract the list of unique blocklists (each for generating one set of wedges) block_ids: dict[int, list[list[int]]] = {} for layer in block_order: max_len = max(len(blocks) for blocks in block_order[layer].values()) block_ids[layer] = [[] for _ in range(max_len)] for block_orig in block_order[layer]: for idx, block_nr in enumerate(block_order[layer][block_orig]): block_ids[layer][idx].append(block_nr) wedge_nr = 1 wedges: dict[int, WedgeGeometry] = {} for layer, block_list_list in block_ids.items(): for block_list in block_list_list: # endspacer wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, self.blockGeometries3D[block_list[0]].outer_surface, None, block_list[0], 0, ) wedge_nr += 1 for bl in range(1, len(block_list)): outer = self.blockGeometries3D[block_list[bl - 1]].inner_surface inner = self.blockGeometries3D[block_list[bl]].outer_surface wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, inner, outer, block_list[bl], block_list[bl - 1], ) wedge_nr += 1 # inner post wedges[wedge_nr] = WedgeGeometry( layer, wedge_nr, None, self.blockGeometries3D[block_list[-1]].inner_surface, 0, block_list[-1], ) wedge_nr += 1 self.wedgeGeometries3D = wedges def _create_mesh_from_df(self) -> None: if self._meshGeomdf.nodes.empty: return self._meshGeometries = self._meshdf_to_geom(self._meshGeomdf) def _create_mesh3d_from_df(self) -> None: if self._meshGeom3ddf.nodes.empty: return self._meshGeometries3D = self._meshdf_to_geom(self._meshGeom3ddf) def _meshdf_to_geom(self, df: MeshGeomDfs) -> Geometry: nodes = df.nodes.to_numpy()[:, 1:] elements = df.elements.to_numpy()[:, 2:] elements -= 1 # translate to 0 based index elements_list = elements.tolist() for nr_elem, lst in zip(df.elements["n_el"], elements_list): del lst[nr_elem:] # Resize lists to match number of elements boundaries = {} if not df.boundaries.empty: for id, grp in df.boundaries.groupby("boundary_id"): if grp.empty: continue boundaries[id] = grp.to_numpy()[:, 2:] return Geometry(nodes, elements_list, boundaries) def _create_bricks_from_df(self): if self._brickGeom3ddf.empty: return for idx, grp in self._brickGeom3ddf.groupby("brick_nr"): brick_nr = int(idx) if grp.empty: continue nodes = grp.to_numpy()[:, 2:].reshape((-1, 3)) geom = Geometry(nodes, None, None) geom.generate_elements_for_coil_nodes() self._brickGeometries3D[brick_nr] = Brick3DGeometry(brick_nr, geom) def _create_coils3d_from_df(self): if self._coilGeomdf.conductors.empty: return for idx, grp in self._coilGeom3ddf.groupby("conductor"): cond_nr = int(idx) if grp.empty: continue block_info = self._topologydf[ (self._topologydf.first_conductor <= cond_nr) & (self._topologydf.last_conductor >= cond_nr) ].iloc[0] block_nr = int(block_info.block_nr) layer_nr = int(block_info.layer_nr) nodes = grp.to_numpy()[:, 2:].reshape((-1, 3)) geom = Geometry(nodes, None, None) geom.generate_elements_for_coil_nodes() self._coilGeometries3D[cond_nr] = Coil3DGeometry( cond_nr, geom, block_nr, layer_nr ) def _create_coils_from_df(self): if self._coilGeomdf.conductors.empty: return cables = {} for _, cond in self._coilGeomdf.conductors.iterrows(): cable_nr = int(cond["conductor"]) geom = cond.to_numpy()[1:].reshape((4, 2)) block_info = self._topologydf[ (self._topologydf.first_conductor <= cond["conductor"]) & (self._topologydf.last_conductor >= cond["conductor"]) ] block_nr = block_info.block_nr layer_nr = block_info.layer_nr first_cond_strand = int( ( block_info.first_strand + (cond["conductor"] - block_info.first_conductor) * block_info.n_radial * block_info.n_azimuthal ).iloc[0] ) last_cond_strand = int( ( first_cond_strand + (block_info.n_radial * block_info.n_azimuthal) - 1 ).iloc[0] ) df_strand = self._coilGeomdf.strands strands = df_strand[ (df_strand["strand"] >= first_cond_strand) & (df_strand["strand"] <= last_cond_strand) ] strands_dict = { int(st["strand"]): st.to_numpy()[1:].reshape((4, 2)) for _, st in strands.iterrows() } cables[cable_nr] = CoilGeometry( cable_nr, block_nr, layer_nr, geom, strands_dict ) self._coilGeometries = cables
[docs] class RoxieOutputParser: """Roxie output parser class. Takes all different Roxie outputs, parses them, and provides a structured output of the results. """ def __init__(self, xml_file: str) -> None: from roxieapi.output.xml_parse import _XmlParser self.logger = logging.getLogger("RoxieOutputParser") self.optimizationGraphs: Dict[ int, pd.DataFrame ] = {} # Result values on optimization graphs, (id) self.opt: Dict[int, OptData] = {} self.plots2D: List[Plot2D] = [] # 2D Plots information for device self.plots3D: List[Plot3D] = [] # 3D Plots information for device self.graphs_device: List[GraphPlot] = [] # Graph information for device self.graphs_transient: List[ GraphPlot ] = [] # Plot information for transient plots self.graphs_optimization: List[ GraphPlot ] = [] # Plot information for optimization plots # General information self.roxie_version = "" self.roxie_githash = "" self.run_date = "" self.comment = "" _XmlParser.parse_xml(xml_file, self)
[docs] def find_transstep(self, opt_step: int, trans_step: int) -> Optional[TransStepData]: if opt_step in self.opt and trans_step in self.opt[opt_step].step: return self.opt[opt_step].step[trans_step] return None
[docs] def find_optstep(self, opt_step) -> Optional[OptData]: return self.opt.get(opt_step, None)
[docs] def get_harmonic_coil( self, coil_nr: int = 1, opt_step: int = 1, trans_step: int = 1, ) -> Optional[HarmonicCoil]: """Return the harmonic coil for given step and coil id, or None if not present :param coil_nr: Harmonic Coil Nr, defaults to 1 :param opt_step: The Optimization Step Nr, defaults to 1 :param trans_step: The Transient Step Nr, defaults to 1 :return: The Harmonic coil, or None """ if trans := self.find_transstep(opt_step, trans_step): return trans.harmonicCoils.get(coil_nr, None) return None
[docs] def get_conductor_forces( self, opt_step: int = 1, trans_step: int = 1 ) -> Optional[pd.DataFrame]: """Return Conductor forces for given Step, or None if not present :param opt_step: The Optimization step, defaults to 1 :param trans_step: Transient step, defaults to 1 :return: The Conductor forces as Dataframe """ if trans := self.find_transstep(opt_step, trans_step): return trans.conductorForces else: return None
[docs] def get_crosssection_plot(self, plot_nr: int = 1) -> Optional[Plot2D]: """Return the Crossection 2D plot with number i :param plot_nr: The plot_number, defaults to 1 :return: The Plot2D object, or None """ for pl in self.plots2D: if isinstance(pl, Plot2D) and pl.id == plot_nr: return pl return None
[docs] def get_3d_plot(self, plot_nr: int = 1) -> Optional[Plot3D]: """Return the 3D plot with number i :param plon_nr: The plot number, defaults to 1 :return: The Plot3D definition, or None """ for pl in self.plots3D: if isinstance(pl, Plot3D) and pl.id == plot_nr: return pl return None