Source code for coconut_tools.create_hdf

"""
Module for extracting and converting plasma simulation data into HDF5 format.

This module provides utility functions to process different types of simulation outputs:
    - CFmesh format (custom structured mesh files)
    - VTU files (e.g., from Paraview-compatible simulations like COCONUT)
    - Formatted .dat files

The main functionalities include:
    - Sampling physical quantities (velocity, magnetic field, density, temperature, etc.)
      at satellite locations or specific grid coordinates
    - Interpolating or extracting the closest grid point
    - Saving extracted time series into standardized HDF5 format

Available functions:
    - `create_hdf_from_cfmesh`: For CFmesh files
    - `create_hdf_from_vtu`: For VTU files
    - `create_hdf_from_dat`: For formatted .dat files with spherical coordinates

"""

import os
import glob
import re
from pathlib import Path
from typing import Dict, List, Tuple, Union
from scipy.interpolate import RBFInterpolator
import h5py
import numpy as np
import natsort
import pyvista as pv
from vtk.util.numpy_support import vtk_to_numpy
from coconut_tools.logger_config import setup_logger
import errno
from datetime import datetime
from coconut_tools.read_dat_files import read_data as read_dat


logger = setup_logger(__name__)

[docs] def find_nearest_index_vtu(points: np.ndarray, target: List[float]) -> int: """Find the index of the point closest to the target coordinates from vtu file. Args: points (np.ndarray): Array of shape (N, 3) containing point coordinates. target (List[float]): Target coordinates [x, y, z]. Returns: int: Index of the closest point. """ distances = np.linalg.norm(points - np.array(target), axis=1) idx = np.argmin(distances) logger.info(f"Closest point found at x={points[idx, 0]}, y={points[idx, 1]}, z={points[idx, 2]}") return idx
[docs] def create_hdf_from_vtu( satellite_positions: Dict[str, List[float]], folder_patterns: Dict[str, str], output_dir: Path, delta_t: float, time_step: float ) -> None: """Extracts plasma quantities from VTU files at satellite positions and stores them in HDF5. Args: satellite_positions (Dict[str, List[float]]): Mapping of satellite names to 3D positions. folder_patterns (Dict[str, str]): Mapping of case names to glob patterns of VTU files. output_dir (Path): Directory to store output HDF5 files. delta_t (float): Time step duration (in code units). time_step (float): Output time step multiplier. Returns: None """ output_dir.mkdir(parents=True, exist_ok=True) for sat_name, sat_pos in satellite_positions.items(): logger.info(f"Processing satellite: {sat_name}") # Identify first VTU file to determine mesh and index first_case = next(iter(folder_patterns)) first_file = natsort.natsorted(glob.glob(folder_patterns[first_case]))[0] mesh = pv.read(first_file) points = vtk_to_numpy(mesh.GetPoints().GetData()) idx = find_nearest_index_vtu(points, sat_pos) for case_name, pattern in folder_patterns.items(): logger.info(f"\tProcessing case: {case_name}") vtu_files = natsort.natsorted(glob.glob(pattern)) B, V, Vx, Vy, Vz, Bx, By, Bz, T, rho, time = ([] for _ in range(11)) for i, file_path in enumerate(vtu_files): mesh = pv.read(file_path) try: bx = mesh.get_array('Bx')[idx] * 2.2e5 by = mesh.get_array('By')[idx] * 2.2e5 bz = mesh.get_array('Bz')[idx] * 2.2e5 vx = mesh.get_array('v')[idx, 0] * 480.24838 vy = mesh.get_array('v')[idx, 1] * 480.24838 vz = mesh.get_array('v')[idx, 2] * 480.24838 density = mesh.get_array('rho')[idx] * 1.67e-16 pressure = mesh.get_array('p')[idx] temp = pressure * 1.27 * 1.67e-24 / (2.0 * density * 1.3807e-16) Bx.append(bx) By.append(by) Bz.append(bz) Vx.append(vx) Vy.append(vy) Vz.append(vz) rho.append(density) T.append(temp) B.append(np.sqrt(bx**2 + by**2 + bz**2)) V.append(np.sqrt(vx**2 + vy**2 + vz**2)) time.append(i * delta_t * time_step * 0.402) except Exception as e: logger.warning(f"\t\tSkipping file {file_path} due to error: {e}") continue # Save HDF5 output_file = output_dir / f"Data_{case_name}.hdf5" with h5py.File(output_file, "w") as f: f.create_dataset('B', data=np.array(B)) f.create_dataset('V', data=np.array(V)) f.create_dataset('Vx', data=np.array(Vx)) f.create_dataset('Vy', data=np.array(Vy)) f.create_dataset('Vz', data=np.array(Vz)) f.create_dataset('Bx', data=np.array(Bx)) f.create_dataset('By', data=np.array(By)) f.create_dataset('Bz', data=np.array(Bz)) f.create_dataset('T', data=np.array(T)) f.create_dataset('rho', data=np.array(rho)) f.create_dataset('time', data=np.array(time)) logger.info(f"\tSaved data to {output_file}")
[docs] def readstruct(lines: List[str]) -> Tuple[int, int, int, int, int, int, List[Tuple[int, str]]]: """ Parses the structure of a CFmesh file by locating key section headers. Args: lines (List[str]): List of lines from the CFmesh file. Returns: Tuple containing: idx0 (int): Index where element connectivity starts. idx1 (int): Index where nodal coordinates start. idx2 (int): Index where state vector starts. idx3 (int): Index marking start of outlet-related section. nbelements (int): Number of elements in the mesh. nend (int): Number of !END markers. comment (List[Tuple[int, str]]): Positions and content of all comment lines. """ exp = re.compile(r'!NB_ELEM (-?\d+)') idx0 = idx1 = idx2 = idx3 = nbelements = nend = 0 comment = [] for i, line in enumerate(lines): if line.startswith("!"): comment.append((i, line)) if line.startswith("!LIST_ELEM"): idx0 = i + 1 elif line.startswith("!LIST_NODE"): idx1 = i + 1 elif line.startswith("!LIST_STATE 1"): idx2 = i + 1 elif line.startswith("!NB_ELEM "): match = exp.search(line) if match: nbelements = int(match.group(1)) elif line.startswith("!TRS_NAME Outlet"): idx3 = i elif line.startswith("!END"): nend += 1 return idx0, idx1, idx2, idx3, nbelements, nend, comment
[docs] def read_cfmesh(input_cfmesh: Union[str, os.PathLike]) -> Tuple[np.ndarray, ...]: """ Reads and parses a CFmesh formatted MHD input file to extract geometry and plasma variables. Args: input_cfmesh (str or Path): Path to the CFmesh file. Returns: Tuple of NumPy arrays containing: cell_center_x, cell_center_y, cell_center_z, Bx, By, Bz, rho, Vx, Vy, Vz, Pressure, Temperature """ if not os.path.isfile(input_cfmesh): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), input_cfmesh) with open(input_cfmesh, "r") as f: lines = f.readlines() idx0, idx1, idx2, idx3, nbelements, nend, comment = readstruct(lines) connectivity = np.loadtxt(lines[idx0:idx0 + nbelements], dtype=int) coordinates = np.loadtxt(lines[idx1:idx2 - 1], dtype=float) # Compute cell centers from 6-node hexahedra node_indices = [connectivity[:, i] for i in range(6)] cell_center_x = sum(coordinates[n, 0] for n in node_indices) / 6 cell_center_y = sum(coordinates[n, 1] for n in node_indices) / 6 cell_center_z = sum(coordinates[n, 2] for n in node_indices) / 6 # Initial state data bd = comment[-nend - 1][0] + 1 bf = comment[-nend][0] initial_data = np.loadtxt(lines[bd:bf], dtype=float) # Convert and scale physical quantities Bx = initial_data[:, 4] * 2.2e-4 By = initial_data[:, 5] * 2.2e-4 Bz = initial_data[:, 6] * 2.2e-4 rho = initial_data[:, 0] * 1.67e-13 / 1.67e-27 Vx = initial_data[:, 1] * 480248.0 Vy = initial_data[:, 2] * 480248.0 Vz = initial_data[:, 3] * 480248.0 Pressure = initial_data[:, 7] * 0.03851 Temperature = Pressure / (rho * 2 * 1.38e-23) return ( cell_center_x, cell_center_y, cell_center_z, Bx, By, Bz, rho, Vx, Vy, Vz, Pressure, Temperature )
[docs] def find_closest_index_cf(x: np.ndarray, y: np.ndarray, z: np.ndarray, target_coord: Tuple[float, float, float]) -> int: """ Find the index of the point closest to a given target in 3D Cartesian coordinates. Args: x (np.ndarray): X coordinates of the mesh points. y (np.ndarray): Y coordinates of the mesh points. z (np.ndarray): Z coordinates of the mesh points. target_coord (Tuple[float, float, float]): Target point in Cartesian coordinates (x, y, z). Returns: int: Index of the closest point in the mesh. """ target = np.array(target_coord) points = np.stack((x, y, z), axis=1) distances = np.linalg.norm(points - target, axis=1) return np.argmin(distances)
def _spherical_unit_vectors_from_cartesian(pos: Tuple[float, float, float]): """Return spherical basis (r̂, θ̂, φ̂) at a Cartesian position. Notes ----- - θ (colatitude) is measured from +Z (0 at north pole, π/2 at equator). - φ (longitude) is measured in the XY-plane from +X toward +Y via atan2(y, x). - Only *direction* matters for the basis; pos can be in solar radii (R☉). """ x, y, z = pos r = np.sqrt(x*x + y*y + z*z) if r == 0: # Degenerate; return canonical basis to avoid NaNs return np.array([1.0, 0.0, 0.0]), np.array([0.0, 0.0, -1.0]), np.array([0.0, 1.0, 0.0]) # Angles theta = np.arccos(np.clip(z / r, -1.0, 1.0)) # colatitude phi = np.arctan2(y, x) # longitude # Unit vectors in Cartesian components r_hat = np.array([np.sin(theta) * np.cos(phi), np.sin(theta) * np.sin(phi), np.cos(theta)]) theta_hat = np.array([np.cos(theta) * np.cos(phi), np.cos(theta) * np.sin(phi), -np.sin(theta)]) phi_hat = np.array([-np.sin(phi), np.cos(phi), 0.0]) return r_hat, theta_hat, phi_hat
[docs] def create_hdf_from_cfmesh( satellite_positions: Dict[str, Tuple[float, float, float]], folder_patterns: Dict[str, str], output_dir: Path, delta_t: float, time_step: float, method: str = "nearest", ) -> None: """Extract plasma quantities from CFmesh files and save time series (incl. spherical components). Parameters ---------- satellite_positions : Dict[str, Tuple[float, float, float]] Satellite positions in Cartesian coordinates (x, y, z) **in solar radii (R☉)**. folder_patterns : Dict[str, str] Mapping of case names to glob patterns of CFmesh files. output_dir : Path Directory to store output HDF5 files. delta_t : float Base time step duration (code units). time_step : float Output time step multiplier. method : str, optional 'nearest' to extract at closest mesh node, or 'interpolate' to RBF-interpolate the fields at the satellite position. Produces -------- For each case, an HDF5 file with datasets: Bx, By, Bz, Vx, Vy, Vz, rho, T, P, time, vr, vclt, vlon, br, bclt, blon Notes ----- - Spherical components follow (r, θ, φ) with θ the **colatitude**. - The time array uses the user's original scaling ``i * delta_t * time_step * 0.402``. """ output_dir.mkdir(exist_ok=True, parents=True) for sat_name, sat_coord in satellite_positions.items(): logger.info(f"Processing satellite: {sat_name}") # Use first file of the first case to get reference mesh (for 'nearest') first_case = next(iter(folder_patterns)) first_files = natsort.natsorted(glob.glob(folder_patterns[first_case])) if not first_files: raise FileNotFoundError(f"No files match pattern for first case '{first_case}': {folder_patterns[first_case]}") first_file = first_files[0] x, y, z, *_ = read_cfmesh(first_file) closest_index = find_closest_index_cf(x, y, z, sat_coord) if method == "nearest" else None if method == "nearest": logger.info( f"Closest mesh point (R☉): x={x[closest_index]:.6g} y={y[closest_index]:.6g} z={z[closest_index]:.6g}" ) for case_name, pattern in folder_patterns.items(): logger.info(f"\tProcessing case: {case_name}") Bx_list, By_list, Bz_list = [], [], [] Vx_list, Vy_list, Vz_list = [], [], [] rho_list, T_list, P_list, time_list = [], [], [], [] # New: spherical components vr_list, vclt_list, vlon_list = [], [], [] br_list, bclt_list, blon_list = [], [], [] files = natsort.natsorted(glob.glob(pattern)) if not files: logger.warning(f"\tNo files for case '{case_name}' with pattern: {pattern}") continue for i, file in enumerate(files): logger.info(f"\t\tReading {file}") x, y, z, Bx, By, Bz, rho, Vx, Vy, Vz, P, T = read_cfmesh(file) # Selection / interpolation if method == "nearest": idx = closest_index bx, by, bz = float(Bx[idx]), float(By[idx]), float(Bz[idx]) vx, vy, vz = float(Vx[idx]), float(Vy[idx]), float(Vz[idx]) rho_val, t_val, p_val = float(rho[idx]), float(T[idx]), float(P[idx]) pos = (float(x[idx]), float(y[idx]), float(z[idx])) elif method == "interpolate": grid = np.column_stack((x, y, z)) sat = np.asarray(sat_coord)[None, :] bx = float(RBFInterpolator(grid, Bx, kernel='linear', neighbors=20)(sat)[0]) by = float(RBFInterpolator(grid, By, kernel='linear', neighbors=20)(sat)[0]) bz = float(RBFInterpolator(grid, Bz, kernel='linear', neighbors=20)(sat)[0]) vx = float(RBFInterpolator(grid, Vx, kernel='linear', neighbors=20)(sat)[0]) vy = float(RBFInterpolator(grid, Vy, kernel='linear', neighbors=20)(sat)[0]) vz = float(RBFInterpolator(grid, Vz, kernel='linear', neighbors=20)(sat)[0]) rho_val = float(RBFInterpolator(grid, rho, kernel='linear', neighbors=20)(sat)[0]) t_val = float(RBFInterpolator(grid, T, kernel='linear', neighbors=20)(sat)[0]) p_val = float(RBFInterpolator(grid, P, kernel='linear', neighbors=20)(sat)[0]) pos = tuple(map(float, sat_coord)) else: raise ValueError("method must be 'nearest' or 'interpolate'") # Append Cartesian & scalar quantities Bx_list.append(bx); By_list.append(by); Bz_list.append(bz) Vx_list.append(vx); Vy_list.append(vy); Vz_list.append(vz) rho_list.append(rho_val); T_list.append(t_val); P_list.append(p_val) time_list.append(i * delta_t * time_step * 0.402) # Compute spherical components at 'pos' (R☉), project V and B r_hat, theta_hat, phi_hat = _spherical_unit_vectors_from_cartesian(pos) v_vec = np.array([vx, vy, vz], dtype=float) b_vec = np.array([bx, by, bz], dtype=float) vr = float(v_vec @ r_hat) v_theta = float(v_vec @ theta_hat) v_phi = float(v_vec @ phi_hat) br = float(b_vec @ r_hat) b_theta = float(b_vec @ theta_hat) b_phi = float(b_vec @ phi_hat) # Store using requested names: vr, vclt(θ), vlon(φ); br, bclt(θ), blon(φ) vr_list.append(vr) vclt_list.append(v_theta) vlon_list.append(v_phi) br_list.append(br) bclt_list.append(b_theta) blon_list.append(b_phi) # Write HDF5 for this case & satellite if files: # Include satellite name in file to avoid overwriting between satellites output_file = output_dir / f"CFData_{case_name}.hdf5" with h5py.File(output_file, 'w') as hdf: hdf.create_dataset('Bx', data=np.asarray(Bx_list)) hdf.create_dataset('By', data=np.asarray(By_list)) hdf.create_dataset('Bz', data=np.asarray(Bz_list)) hdf.create_dataset('Vx', data=np.asarray(Vx_list)) hdf.create_dataset('Vy', data=np.asarray(Vy_list)) hdf.create_dataset('Vz', data=np.asarray(Vz_list)) hdf.create_dataset('rho', data=np.asarray(rho_list)) hdf.create_dataset('T', data=np.asarray(T_list)) hdf.create_dataset('P', data=np.asarray(P_list)) hdf.create_dataset('time', data=np.asarray(time_list)) # New spherical components hdf.create_dataset('vr', data=np.asarray(vr_list)) hdf.create_dataset('vclt', data=np.asarray(vclt_list)) hdf.create_dataset('vlon', data=np.asarray(vlon_list)) hdf.create_dataset('br', data=np.asarray(br_list)) hdf.create_dataset('bclt', data=np.asarray(bclt_list)) hdf.create_dataset('blon', data=np.asarray(blon_list)) logger.info(f"\tSaved data to {output_file}")
[docs] def find_closest_indices(clt: np.ndarray, lon: np.ndarray, target_clt: float, target_lon: float) -> Tuple[int, int]: """Finds the indices in colatitude and longitude grids closest to the desired values (in degrees). Args: clt (np.ndarray): Array of colatitudes (degrees). lon (np.ndarray): Array of longitudes (degrees). target_clt (float): Target colatitude in degrees. target_lon (float): Target longitude in degrees. Returns: Tuple[int, int]: Indices for colatitude and longitude. """ clt_idx = np.argmin(np.abs(clt - target_clt)) lon_idx = np.argmin(np.abs(lon - target_lon)) return clt_idx, lon_idx
[docs] def create_hdf_from_dat( folder_patterns: Dict[str, str], output_dir: Path, target_clt: float, target_lon: float ) -> None: """Creates HDF5 files from .dat files containing spherical grid data. Args: folder_patterns (Dict[str, str]): Dictionary of simulation name -> glob pattern for dat files. output_dir (Path): Output directory for HDF5 files. target_clt (float): Desired colatitude in degrees. target_lon (float): Desired longitude in degrees. Returns: None """ output_dir.mkdir(parents=True, exist_ok=True) # Determine closest indices from first file first_case = next(iter(folder_patterns)) first_file = natsort.natsorted(glob.glob(folder_patterns[first_case]))[0] _, clt_ref, lon_ref, *_ = read_dat(first_file) clt_idx, lon_idx = find_closest_indices(clt_ref, lon_ref, target_clt, target_lon) logger.info(f"Closest indices - colat: {clt_idx}, lon: {lon_idx}") for case_name, pattern in folder_patterns.items(): logger.info(f"Processing case: {case_name}") files = natsort.natsorted(glob.glob(pattern)) date_l, clt_l, lon_l = [], [], [] vr_l, vlon_l, vclt_l = [], [], [] density_l, br_l, bclt_l, blon_l, temp_l = [], [], [], [], [] for i, file in enumerate(files): logger.info(f"\tReading file: {file}") date, clt, lon, vr, vlon, vclt, density, br, bclt, blon, temp = read_dat(file) date_l.append(datetime.fromisoformat(date).timestamp()) clt_l.append(clt[clt_idx]) lon_l.append(lon[lon_idx]) vr_l.append(vr[lon_idx][clt_idx]) vlon_l.append(vlon[lon_idx][clt_idx]) vclt_l.append(vclt[lon_idx][clt_idx]) density_l.append(density[lon_idx][clt_idx]) br_l.append(br[lon_idx][clt_idx]) bclt_l.append(bclt[lon_idx][clt_idx]) blon_l.append(blon[lon_idx][clt_idx]) temp_l.append(temp[lon_idx][clt_idx]) output_file = output_dir / f"DatData_{case_name}.hdf5" with h5py.File(output_file, 'w') as hdf: hdf.create_dataset('date', data=np.array(date_l)) hdf.create_dataset('clt', data=np.array(clt_l)) hdf.create_dataset('lon', data=np.array(lon_l)) hdf.create_dataset('vr', data=np.array(vr_l)) hdf.create_dataset('vclt', data=np.array(vclt_l)) hdf.create_dataset('vlon', data=np.array(vlon_l)) hdf.create_dataset('br', data=np.array(br_l)) hdf.create_dataset('bclt', data=np.array(bclt_l)) hdf.create_dataset('blon', data=np.array(blon_l)) hdf.create_dataset('density', data=np.array(density_l)) hdf.create_dataset('temperature', data=np.array(temp_l)) logger.info(f"\tSaved data to {output_file}")
if __name__ == "__main__": satellite_cartesian = {"STA": [-10.0, 0.0, 0.0]} # For CFMesh or VTU delta_t = 0.005 time_step = 20 output_dir = Path("E:/euhforia/CFcase_large/result_cme/test_hdf_output") cfmesh_cases = { "cme": "E:/euhforia/CFcase_large/result_cme/*.CFmesh", } create_hdf_from_cfmesh(satellite_cartesian, cfmesh_cases, output_dir , delta_t, time_step) """ # Example test satellite in Cartesian coordinates or spherical for .dat satellite_cartesian = {"STA": [-10.0, 0.0, 0.0]} # For CFMesh or VTU satellite_spherical = {"STA": (90.0, 0.0)} # For .dat (colatitude=90°, longitude=0°) delta_t = 0.005 time_step = 20 output_dir = Path("test_hdf_output") # --- Test 1: VTU --- vtu_folders = { "COCONUT_A": "test_data/vtu/caseA/*.vtu", "COCONUT_B": "test_data/vtu/caseB/*.vtu", } create_hdf_from_vtu(satellite_cartesian, vtu_folders, output_dir / "vtu", delta_t, time_step) # --- Test 2: CFMesh --- cfmesh_cases = { "CFMESH_A": "test_data/cfmesh/caseA/*.CFmesh", "CFMESH_B": "test_data/cfmesh/caseB/*.CFmesh" } create_hdf_from_cfmesh(satellite_cartesian, cfmesh_cases, output_dir / "cfmesh", delta_t, time_step) # --- Test 3: DAT --- dat_cases = { "TOMO_A": "test_data/dat/caseA/*.dat", "TOMO_B": "test_data/dat/caseB/*.dat", } create_hdf_from_dat(satellite_spherical, dat_cases, output_dir / "dat") """