Source code for coconut_tools.pyTDM.core_td.COCONUT.coconut

"""
Module for handling CFmesh files and injecting TDm/RBSL structures into the COCONUT model.

This module provides functions to:
- read and parse `corona.CFmesh` files used in MHD simulations with COCONUT,
- interpolate Cartesian magnetic fields (from VTK/VTU files) to the cell centers of a CFmesh grid,
- generate a new CFmesh file with the injected magnetic structure,
- convert Cartesian magnetic fields to spherical components (Br, Bθ, Bφ),
- aggregate multiple `.vtu` files from parallel COCONUT simulations.

Main functions:
- `read_CFmesh`: reads a CFmesh file and returns the magnetic field components in spherical coordinates.
- `read_vtu`: merges `.vtu` files from a parallel run of COCONUT.
- `readstruct`: extracts the internal structure and block indices from a CFmesh file.

This module is used to enable the injection of TDm or RBSL flux ropes as initial conditions
in COCONUT-based simulations.
"""

import numpy as np
import pandas as pd
import pyvista as pyv
from tqdm import tqdm
from vtk.util.numpy_support import vtk_to_numpy
import errno
import os.path
from typing import List, Tuple
import re
import logging
from datetime import datetime
from scipy.interpolate import griddata, RBFInterpolator, RegularGridInterpolator, LinearNDInterpolator
import os


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs] def readstruct(lines: List[str]) -> Tuple[int, int, int, int, int, int, list]: """ Find the structure of the CFmesh Args: lines (list(str)): contains the CFmesh lines Returns: idx0 (int): where the node start idx1 (int): where the state start idx2 (int): where the elem start idx3 (int): where I don't know what start nend (int): nbr of !end nbelements (int): nb of elements comment list(list(int,str)): all the non number lines and where there are """ exp = re.compile(r'!NB_ELEM (-?\d+)') nbelements, idx0, idx1, idx2, idx3, nend = 0, 0, 0, 0, 0, 0 comment = [] for i, line in enumerate(lines): if line.startswith("!"): comment.append([i, line]) if line.startswith("!LIST_NODE"): idx1 = i + 1 elif line.startswith("!LIST_STATE 1"): idx2 = i + 1 elif line.startswith("!LIST_ELEM"): idx0 = i + 1 elif line.startswith("!NB_ELEM "): nbelements = int(exp.search(line).group(1)) elif line.startswith("!TRS_NAME Outlet"): idx3 = i elif line.startswith("!END"): nend += 1 else: continue else: continue return idx0, idx1, idx2, idx3, nbelements, nend, comment
[docs] def read_CFmesh(path_file: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Read and extract magnetic field data from a COCONUT CFmesh file. This function computes the magnetic field components in spherical coordinates (radial, co-latitudinal, and longitudinal) at the center of each mesh cell. Args: path_file (str): Path to the directory containing the 'corona.CFmesh' file. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: - Unique cell centers (N x 3) - Radial magnetic field component `br` - Co-latitudinal magnetic field component `bclt` - Longitudinal magnetic field component `blon` Raises: FileNotFoundError: If 'corona.CFmesh' does not exist in the given path. """ inputfile = os.path.join(path_file, 'corona.CFmesh') if not os.path.isfile(inputfile): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), inputfile) with open(inputfile, "r") as MHDinfile: lines = MHDinfile.readlines() # Get structure and metadata idx0, idx1, idx2, idx3, nbelements, nend, comment = readstruct(lines) connectivity = np.loadtxt(lines[idx0:idx0 + nbelements], dtype=int) other = np.loadtxt(lines[idx0 + nbelements + 6:idx3], dtype=int) other2 = np.loadtxt(lines[idx3 + 5:idx1 - 2], dtype=int) coordinates = np.loadtxt(lines[idx1:idx2 - 1], dtype=float) # Compute cell centers nodes = [connectivity[:, i] for i in range(6)] cell_center_x = sum(coordinates[n, 0] for n in nodes) / 6.0 cell_center_y = sum(coordinates[n, 1] for n in nodes) / 6.0 cell_center_z = sum(coordinates[n, 2] for n in nodes) / 6.0 cell_centers = np.stack((cell_center_x, cell_center_y, cell_center_z), axis=1) # Load initial data bd = comment[-nend - 1][0] + 1 bf = comment[-nend][0] Initialdata = np.loadtxt(lines[bd:bf], dtype=np.float64) x, y, z = cell_centers[:, 0], cell_centers[:, 1], cell_centers[:, 2] r = np.sqrt(x ** 2 + y ** 2 + z ** 2) r_bis = np.sqrt(x ** 2 + y ** 2) Bx, By, Bz = Initialdata[:, 4], Initialdata[:, 5], Initialdata[:, 6] EPSILON = 0 # Can be changed to small positive value if needed to avoid division by zero br = (x * Bx + y * By + z * Bz) / r blon = (-y * Bx + x * By) / (r_bis + EPSILON) bclt = (x * z * Bx + y * z * By - (r_bis + EPSILON) ** 2 * Bz) / ((r_bis + EPSILON) * r) uni_grid_after, index_after = np.unique(cell_centers, axis=0, return_index=True) return uni_grid_after, br[index_after], bclt[index_after], blon[index_after]
[docs] def read_vtu(path_file: str, nb_proc: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Merge and clean multiple VTU files generated by COCONUT and compute magnetic field components in spherical coordinates. Args: path_file (str): Directory where the VTU files are located. nb_proc (int): Number of processors (and thus number of VTU files to read). Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: - Unique 3D coordinates of the points (shape: N x 3) - Radial component of B field (br) - Co-latitudinal component (bt) - Azimuthal component (bp) """ logger.info('Loading VTU files...') for i in tqdm(range(nb_proc), desc='Reading VTU files'): fileloc = os.path.join(path_file, f'corona-flow0-P{i}.vtu') mesh = pyv.read(fileloc) # Grid points_loc = vtk_to_numpy(mesh.GetPoints().GetData()) nb_pts_loc = points_loc.shape[0] x_loc, y_loc, z_loc = points_loc[:, 0], points_loc[:, 1], points_loc[:, 2] # Magnetic field bx_loc = mesh.get_array('Bx') by_loc = mesh.get_array('By') bz_loc = mesh.get_array('Bz') if i == 0: points, x, y, z = points_loc, x_loc, y_loc, z_loc bx, by, bz = bx_loc, by_loc, bz_loc else: points = np.concatenate((points, points_loc)) x = np.concatenate((x, x_loc)) y = np.concatenate((y, y_loc)) z = np.concatenate((z, z_loc)) bx = np.concatenate((bx, bx_loc)) by = np.concatenate((by, by_loc)) bz = np.concatenate((bz, bz_loc)) # Remove duplicates df = pd.DataFrame(points) df_unique = df.drop_duplicates() idx = df_unique.index.to_numpy() points = df_unique.to_numpy() nb_dup = len(x) - len(points) x, y, z = x[idx], y[idx], z[idx] bx, by, bz = bx[idx], by[idx], bz[idx] logger.info(f'Removed {nb_dup} duplicates.') r = np.sqrt(x ** 2 + y ** 2 + z ** 2) rxy = np.sqrt(x ** 2 + y ** 2) br = (x * bx + y * by + z * bz) / r bt = (x * z * bx + y * z * by - (x ** 2 + y ** 2) * bz) / (r * rxy) bp = (-y * bx + x * by) / rxy return points, br, bt, bp
[docs] def restart_coolfluid(path: str, vtsfile: str, path_save: str, alpha_0 :float, theta_0 :float, phi_0 :float, d :float, a :float, R :float,name, method: str = 'rbf', save: bool = False) -> None: """ Create the CFmesh with the Tdm within Args: path (str): Where is the CFmesh file vtsfile (str): Where is the vtk file path_save (str): Where save the newCFmesh method (str): Which method we want to use for the interpolation save (bool): If true we save a vtk file with the TDm in the solar wind Returns: None Raise: FileNotFoundError: If the CFmesh doesn't exist NameError: If the interpolation method doesn't exist """ inputfile = os.path.join(path, "corona.CFmesh") if not os.path.isfile(inputfile): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), inputfile) else: MHDinfile = open(inputfile, "r") lines = MHDinfile.readlines() now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Scripts starts at {current_time}') # define the structure of the CFmesh idx0, idx1, idx2, idx3, nbelements, nend, comment = readstruct(lines) # read the connectivity connectivity = np.loadtxt(lines[idx0:idx0 + nbelements], dtype=int) # read I don't what other = np.loadtxt(lines[idx0 + nbelements + 6:idx3], dtype=int) other2 = np.loadtxt(lines[idx3 + 5:idx1 - 2], dtype=int) # read the coordinates coordinates = np.loadtxt(lines[idx1:idx2 - 1], dtype=float) # cell centers n1 = connectivity[:, 0] n2 = connectivity[:, 1] n3 = connectivity[:, 2] n4 = connectivity[:, 3] n5 = connectivity[:, 4] n6 = connectivity[:, 5] cell_center_x = 1. / 6. * ( coordinates[n1, 0] + coordinates[n2, 0] + coordinates[n3, 0] + coordinates[n4, 0] + coordinates[n5, 0] + coordinates[n6, 0]) cell_center_y = 1. / 6. * ( coordinates[n1, 1] + coordinates[n2, 1] + coordinates[n3, 1] + coordinates[n4, 1] + coordinates[n5, 1] + coordinates[n6, 1]) cell_center_z = 1. / 6. * ( coordinates[n1, 2] + coordinates[n2, 2] + coordinates[n3, 2] + coordinates[n4, 2] + coordinates[n5, 2] + coordinates[n6, 2]) cell_centers = np.zeros((len(cell_center_x), 3)) cell_centers[:, 0] = cell_center_x cell_centers[:, 1] = cell_center_y cell_centers[:, 2] = cell_center_z # Initial data bd = comment[-nend - 1][0] + 1 bf = comment[-nend][0] Initialdata = np.loadtxt(lines[bd:bf], dtype=float) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Ok Cfmesh is read at {current_time}') # Time to interpolate DATA = createdata(vtsfile) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Ok data created and save at {current_time}') X_tointerp = DATA[:, 0] Y_tointerp = DATA[:, 1] Z_tointerp = DATA[:, 2] bx_tointerp = DATA[:, 3] by_tointerp = DATA[:, 4] bz_tointerp = DATA[:, 5] now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Time to interpolate at {current_time}') if re.match('nearest', method): Initialdata[:, 4] = griddata((X_tointerp, Y_tointerp, Z_tointerp), bx_tointerp, cell_centers, method=method) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'1/3 {current_time}') Initialdata[:, 5] = griddata((X_tointerp, Y_tointerp, Z_tointerp), by_tointerp, cell_centers, method=method) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'2/3 {current_time}') Initialdata[:, 6] = griddata((X_tointerp, Y_tointerp, Z_tointerp), bz_tointerp, cell_centers, method=method) elif re.match('rbf', method): grid_interp = np.stack((X_tointerp, Y_tointerp, Z_tointerp), axis=1) ri = RBFInterpolator(grid_interp, bx_tointerp, kernel='linear', neighbors=50) Initialdata[:, 4] = ri(cell_centers) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'1/3 {current_time}') ri = RBFInterpolator(grid_interp, by_tointerp, kernel='linear', neighbors=50) Initialdata[:, 5] = ri(cell_centers) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'2/3 {current_time}') ri = RBFInterpolator(grid_interp, bz_tointerp, kernel='linear', neighbors=50) Initialdata[:, 6] = ri(cell_centers) elif re.match('linear', method): grid_interp = np.stack((X_tointerp, Y_tointerp, Z_tointerp), axis=1) ri = LinearNDInterpolator(grid_interp, bx_tointerp) Initialdata[:, 4] = ri(cell_centers) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'1/3 {current_time}') ri = LinearNDInterpolator(grid_interp, by_tointerp) Initialdata[:, 5] = ri(cell_centers) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'2/3 {current_time}') ri = LinearNDInterpolator(grid_interp, bz_tointerp) Initialdata[:, 6] = ri(cell_centers) else: raise NameError('Need a correct interpolation method') """ logger.info(f'we add vr') xc = (1.0-d) * np.sin(theta_0) * np.cos(phi_0) yc = (1.0-d) * np.sin(theta_0) * np.sin(phi_0) zc = (1.0-d) * np.cos(theta_0) xnew=cell_centers[:, 0]-xc ynew=cell_centers[:, 1]-yc znew=cell_centers[:, 2]-zc xpr, ypr, zpr = rotation(xnew, ynew, znew, theta_0, phi_0, alpha_0, reverse=False) cltmeshnew = np.arccos(zpr / np.sqrt(xpr ** 2 + ypr ** 2 + zpr ** 2)) rhonew = np.sqrt(xpr ** 2 + ypr ** 2 + zpr ** 2) rho = np.sqrt(rhonew ** 2 + R ** 2 - 2 * rhonew * R * np.sin(cltmeshnew)) magnitude = np.sqrt(cell_centers[rho<=a, 0] ** 2 + cell_centers[rho<=a, 1] ** 2 + cell_centers[rho<=a, 2] ** 2) Initialdata[rho<=a, 1] = (500.0/480.24838) * cell_centers[rho<=a, 0] / magnitude Initialdata[rho<=a, 2] = (500.0/480.24838) * cell_centers[rho<=a, 1] / magnitude Initialdata[rho<=a, 3] = (500.0/480.24838) * cell_centers[rho<=a, 2] / magnitude """ path_file = path_save + '/' + name + '/' os.makedirs(path_file, exist_ok=True) # save the vtk file if save: now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Save the new vtk file at {current_time}') n_cells, n_elbycell = np.shape(connectivity) n_elbycell = n_elbycell - 1 cells = np.zeros((n_cells, n_elbycell + 1)) cells[:, 1:] = connectivity[:, :-1].astype('int32') cells[:, 0] = n_elbycell celltypes = np.empty(n_cells, dtype=np.uint32) celltypes[:] = 13 cells = cells.ravel().astype(int) grid = pyv.UnstructuredGrid(cells, celltypes, coordinates) grid.cell_data['Bx'] = Initialdata[:, 4] grid.cell_data['By'] = Initialdata[:, 5] grid.cell_data['Bz'] = Initialdata[:, 6] grid.cell_data['Vx'] = Initialdata[:, 1] grid.cell_data['Vy'] = Initialdata[:, 2] grid.cell_data['Vz'] = Initialdata[:, 3] grid.save(f'{path_file}TDM_after.vtk') # Now save the new file now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'Save the file at {current_time}') MHDinfile.close() outputfile = path_file + 'corona_restart.CFmesh' with open(outputfile, "w", newline="\n") as MHDoutfile: # 1. Header comments for i in range(0, 15): MHDoutfile.write(comment[i][1]) # 2. Connectivity for row in connectivity.astype(str): MHDoutfile.write(" ".join(row) + "\n") for i in range(15, 21): MHDoutfile.write(comment[i][1]) # 3. Other for row in other.astype(str): MHDoutfile.write(" ".join(row) + "\n") for i in range(21, 26): MHDoutfile.write(comment[i][1]) # 4. Other2 for row in other2.astype(str): MHDoutfile.write(" ".join(row) + "\n") for i in range(26, 28): MHDoutfile.write(comment[i][1]) # 5. Coordinates for row in coordinates.astype(str): MHDoutfile.write(" ".join(row) + "\n") # 6. Comment before Initialdata MHDoutfile.write(comment[28][1]) # 7. Initial data for row in Initialdata.astype(str): MHDoutfile.write(" ".join(row) + "\n") # 8. Footer comment MHDoutfile.write(comment[-1][1]) now = datetime.now() current_time = now.strftime("%H:%M:%S") logger.info(f'End at {current_time}') return
[docs] def createdata(vtkfile: str) -> List[List]: """ Create the data file containing the magnetic field Args: vtkfile (str): Magnetic field file Returns: List (list(float,float,float,float,float,float) : coordinates and magnetic field components Raises: FileNotFoundError: if the Tdm file doesn't exist """ vtkfile=vtkfile+'.vts' if not os.path.isfile(vtkfile): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), vtkfile) else: # noinspection PyUnusedLocal mesh = pyv.read(vtkfile) B_cart = mesh.get_array('B_cart') points_loc = vtk_to_numpy(mesh.GetPoints().GetData()) # Flat the value X_tointerp = np.ndarray.flatten(points_loc[:, 0]) Y_tointerp = np.ndarray.flatten(points_loc[:, 1]) Z_tointerp = np.ndarray.flatten(points_loc[:, 2]) bx_tointerp = np.ndarray.flatten(B_cart[:, 0]) by_tointerp = np.ndarray.flatten(B_cart[:, 1]) bz_tointerp = np.ndarray.flatten(B_cart[:, 2]) # remove the nan values mask = np.isfinite(bx_tointerp) X_tointerp = X_tointerp[mask] Y_tointerp = Y_tointerp[mask] Z_tointerp = Z_tointerp[mask] bx_tointerp = bx_tointerp[mask] by_tointerp = by_tointerp[mask] bz_tointerp = bz_tointerp[mask] DATA = np.array( [[X_tointerp[i], Y_tointerp[i], Z_tointerp[i], bx_tointerp[i], by_tointerp[i], bz_tointerp[i]] for i in range(len(X_tointerp))]) return DATA