Source code for coconut_tools.pyvista_slice

"""
Coconut Visualization with PyVista

This module provides tools to read, convert, and visualize MHD simulation data from
COCONUT using PyVista. The main objective is to demonstrate how to load a VTU file,
convert the data to physical units and spherical coordinates, and create multiple
visualizations (slice, clipping, streamlines, isosurfaces).

Typical visualizations include:
    - Slice of radial velocity (vr)
    - Clipped spherical region colored by radial magnetic field (br)
    - Streamlines of the magnetic field
    - Isosurface of density (optional)

Author: Luis, Quentin
"""

import numpy as np
import pyvista as pv
import logging
import os

import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

def _to_int(x, name: str):
    try:
        return int(x)
    except Exception as e:
        raise TypeError(f"{name} must be an integer-like value, got {x!r}") from e

[docs] def make_streamlines(mesh, vectors="B", max_steps=1000, n_points=1000, source_radius=2.0, source_center=(0, 0, 0), initial_step_length=None, step_unit_val=None, max_error=None, max_step_length=None): """ Wrapper that normalizes types for VTK/PyVista streamline settings. - Casts step-like params to int where VTK expects integers. - Uses `max_steps` (replacement for deprecated `max_time`). """ # Ensure ints where VTK requires ints max_steps = _to_int(max_steps, "max_steps") n_points = _to_int(n_points, "n_points") # Build streamlines via PyVista stream = mesh.streamlines( vectors=vectors, max_steps=max_steps, # must be int n_points=n_points, # must be int source_radius=source_radius, source_center=source_center, # Note: the following kwargs are only passed if not None **({} if initial_step_length is None else {"initial_step_length": initial_step_length}), **({} if step_unit_val is None else {"integration_step_unit": step_unit_val}), **({} if max_error is None else {"maximum_error": max_error}), **({} if max_step_length is None else {"maximum_step_length": max_step_length}), ) return stream
[docs] def read_mesh(filename,verbose=True): """Load a VTU mesh file. Args: filename (str): Path to the VTU file. Returns: pv.DataSet: PyVista mesh object. """ if verbose: logging.info('Reading file...') mesh = pv.read(filename) if verbose: logging.info("Arrays in point_data: %s", list(mesh.point_data.keys())) if verbose: logging.info("Arrays in cell_data: %s", list(mesh.cell_data.keys())) if verbose: logging.info("Arrays in field_data: %s", list(mesh.field_data.keys())) if verbose: logging.info('Done!') return mesh
[docs] def convert_units(mesh, verbose=True ): """Convert mesh quantities into physical units and add them to the mesh. Args: mesh (pv.DataSet): PyVista mesh. Returns: pv.DataSet: Updated mesh with new fields. """ if verbose: logging.info('Converting to physical units...') pts = mesh.points * 6.955e8 # Convert to meters mesh['x'], mesh['y'], mesh['z'] = pts[:,0], pts[:,1], pts[:,2] # m mesh['rho_dim'] = mesh['rho'] * 1.67e-16 # g/cm3 mesh['prs_dim'] = mesh['p'] * 0.3851 # Ba = dyn/cm2 mesh['temp'] = 7.7e-9 * mesh['prs_dim'] / mesh['rho_dim'] # K # T = mu*mp/kb * P / rho -> mu=0.64 hardcoded here ! v = mesh['v'] * 480.24838 # km/s mesh['vx_dim'], mesh['vy_dim'], mesh['vz_dim'] = v[:,0], v[:,1], v[:,2] mesh['bx_dim'] = mesh['Bx'] * 2.2 # G mesh['by_dim'] = mesh['By'] * 2.2 # G mesh['bz_dim'] = mesh['Bz'] * 2.2 # G return mesh
[docs] def convert_to_spherical(mesh, verbose=True): """Convert Cartesian coordinates to spherical and compute radial components. Args: mesh (pv.DataSet): Mesh with Cartesian and dimensional quantities. Returns: pv.DataSet: Updated mesh with spherical quantities. """ if verbose: logging.info('Converting to spherical coordinates...') x, y, z = mesh['x'], mesh['y'], mesh['z'] vx, vy, vz = mesh['vx_dim'], mesh['vy_dim'], mesh['vz_dim'] bx, by, bz = mesh['bx_dim'], mesh['by_dim'], mesh['bz_dim'] r = np.sqrt(x**2 + y**2 + z**2) rxy = np.sqrt(x**2 + y**2) + 1e-20 mesh['r'] = r mesh['rxy'] = rxy mesh['vr'] = (x*vx + y*vy + z*vz) / r mesh['vtheta'] = (x*z*vx + y*z*vy - rxy**2*vz) / (rxy*r) mesh['vphi'] = (-y*vx + x*vy) / rxy mesh['br'] = (x*bx + y*by + z*bz) / r mesh['btheta'] = (x*z*bx + y*z*by - rxy**2*bz) / (rxy*r) mesh['bphi'] = (-y*bx + x*by) / rxy return mesh
[docs] def visualize( mesh, slice_normal: str = "y", slice_plane_scalar: str = "vr", save_path: str | None = None, show: bool = True, vr_clim: tuple[float, float] | None = None, br_clim: tuple[float, float] | None = None, stream_clim: tuple[float, float] | None = None, volumic_vr: float | None = None, rho_iso: float = 1e-13, AlfvSurf: bool = False, camera_radius: float = 18.0, #cylindrical coordinates camera_phi_deg: float = 60.0, camera_z: float = 4.0, verbose: bool=True, ): """Create a PyVista visualization of MHD quantities. The visualization includes slice planes, streamlines, optional iso surfaces, and a configurable camera position. Args: mesh: PyVista dataset containing the physical fields. slice_normal (str, optional): Normal of the slice plane. Must be one of {"x", "y", "z"}. Default is "y". slice_plane_scalar (str, optional): Scalar field used for coloring the slice plane. Default is "vr". save_path (str | None, optional): Path where the rendered image is saved. If None, the image is not saved. Default is None. show (bool, optional): If True, show the interactive viewer. If False, run in off screen mode only. Default is True. vr_clim (tuple[float, float] | None, optional): Color limits for the radial velocity field. Default is None. br_clim (tuple[float, float] | None, optional): Color limits for the radial magnetic field. Default is None. stream_clim (tuple[float, float] | None, optional): Color limits for the streamline magnitude. Default is None. volumic_vr (float | None, optional): If not None, draw an iso surface of the radial velocity at this value. Default is None. rho_iso (float, optional): Density iso surface value in g cm^(-3). Default is 1e-13. AlfvSurf (bool, optional): If True, plot the Alfvén surface. Default is False. camera_radius (float, optional): Radial distance of the camera from the origin. Default is 18.0. camera_phi_deg (float, optional): Azimuthal angle of the camera in degrees. Default is 60.0. camera_z (float, optional): Height of the camera above the equatorial plane. Default is 4.0. Returns: None """ if verbose: logging.info("Creating plotter...") off = not show if off: os.environ.setdefault("PYVISTA_OFF_SCREEN", "1") pv.OFF_SCREEN = True p = pv.Plotter(off_screen=off, window_size=[1920, 1080]) # -------------------------------------------------------------------------- # Camera configuration phi = np.radians(camera_phi_deg) camera_position = [ (camera_radius * np.cos(phi), camera_radius * np.sin(phi), camera_z), (0.0, 0.0, 0.0), # look at origin (0.0, 0.0, 1.0), # "up" direction ] p.camera_position = camera_position # -------------------------------------------------------------------------- # Slice plot: radial velocity vr if verbose: logging.info("Adding slice plot ("+slice_plane_scalar+")…") slice_plane = mesh.slice(normal=slice_normal, origin=(0, 0, 0)) if slice_plane_scalar=="vr": slice_plane_tit="Radial Velocity [km/s]" plane_log_scale=False slice_plane_cmap='viridis' opac=1. elif slice_plane_scalar=="rho_dim": slice_plane_tit=r"Density [g/cm$^3$]" plane_log_scale=True slice_plane_cmap='plasma' # discretizes levels = 16 base = plt.get_cmap(slice_plane_cmap) slice_plane_cmap = ListedColormap(base(np.linspace(0, 1, levels))) # opac=0.7 else: slice_plane_tit="" plane_log_scale=False slice_plane_cmap='viridis' opac=1. p.add_mesh( slice_plane, scalars=slice_plane_scalar, cmap=slice_plane_cmap, clim=vr_clim, log_scale=plane_log_scale, opacity=opac, scalar_bar_args={ "height": 0.25, "vertical": True, "title_font_size": 20, "width": 0.05, "title": slice_plane_tit, "position_x": 0.10, "position_y": 0.65, }, ) p.enable_depth_peeling(number_of_peels=200, occlusion_ratio=0.0) # > Log scaling compresses values > large flat regions of nearly identical color. VTK’s default alpha blending then depends strongly on triangle draw order > patchiness. # -------------------------------------------------------------------------- # Alfven surface if AlfvSurf: if verbose: logging.info("Adding Alfven surface…") va = np.sqrt(mesh['br']**2+mesh['btheta']**2+mesh['bphi']**2)/np.sqrt(4.*np.pi*mesh['rho_dim']) Ma = np.sqrt(mesh['vr']**2+mesh['vtheta']**2+mesh['vphi']**2)*1.e5/va #print(mesh.n_points, mesh.n_cells, Ma.shape, Ma.size) mesh["Ma"] = Ma # .point_array ? Ma.swapaxes(-2,-1).ravel("C") ? AlfSurf=mesh.contour(scalars="Ma",isosurfaces=[1.]) AlfvenSurf=p.add_mesh(AlfSurf,opacity=0.4,color='lightblue') if volumic_vr != None: scalar='vr' VrSurf=mesh.contour(scalars=scalar,isosurfaces=[volumic_vr]) VradialSurf=p.add_mesh(VrSurf,opacity=0.4,color='red') #opacity='geom' #logging.info("Adding volumic rendering ("+scalar+")…") #p.add_volume(mesh,scalars=scalar,cmap=['r','r','r'],opacity=opacity,clim=vlim) # -------------------------------------------------------------------------- # Clipped inner sphere: br if verbose: logging.info("Adding clipped sphere (br)…") sphere = pv.Sphere(center=(0, 0, 0), radius=1.01) clipped = mesh.clip_surface(sphere) #if br_clim is None: # br_clim = (-1.0, 1.0) p.add_mesh( clipped, scalars="br", cmap="seismic", clim=br_clim, scalar_bar_args={ "height": 0.25, "vertical": True, "title_font_size": 20, "width": 0.05, "title": "Radial Magnetic Field [G]", "position_x": 0.85, "position_y": 0.65, }, ) # -------------------------------------------------------------------------- # Magnetic-field streamlines if verbose: logging.info("Adding magnetic field streamlines…") mesh["B"] = np.column_stack([mesh["bx_dim"], mesh["by_dim"], mesh["bz_dim"]]) stream = make_streamlines( mesh, vectors="B", max_steps=1000, n_points=500, source_radius=2.0, source_center=(0, 0, 0), ) stream_tube = stream.tube(radius=0.01) p.add_mesh( stream_tube, cmap="binary", clim=stream_clim, scalar_bar_args={ "height": 0.25, "vertical": True, "title_font_size": 20, "width": 0.05, "title": "B-field Streamlines", "position_x": 0.85, "position_y": 0.05, }, ) # -------------------------------------------------------------------------- # Density isosurface if rho_iso != 0.0: if verbose: logging.info("Adding density isosurface...") rmin, rmax = mesh.get_data_range("rho_dim") if not (rmin <= rho_iso <= rmax): logging.warning( f"rho_iso={rho_iso:.2e} outside rho_dim range " f"[{rmin:.2e}, {rmax:.2e}]. Using mid range instead." ) rho_iso = 0.5 * (rmin + rmax) iso = mesh.contour([rho_iso], scalars="rho_dim") if iso.n_points == 0: logging.warning("Density isosurface is empty even after adjustment. Skipping.") else: p.add_mesh( iso, scalars="rho_dim", cmap="plasma", opacity=0.4, show_scalar_bar=False, ) if verbose: logging.info(f"Density isosurface at rho = {rho_iso:.2e} g/cm^3") # -------------------------------------------------------------------------- # Render + save p.show(interactive=False, auto_close=False, window_size=[1800, 900]) if save_path: if verbose: logging.info(f"Saving figure to {save_path}…") p.screenshot(save_path) if show: p.show() p.close()
if __name__ == '__main__': input_path = "C:/Users/luisl/Documents/Travail/Article_COCORIA/corona-mhd_0.vtu" mesh = read_mesh(input_path) mesh = convert_units(mesh) mesh = convert_to_spherical(mesh) visualize(mesh, slice_normal='y', save_path='E:/euhforia/image/pyvista_slice.png', show=True, AlfvSurf=True)