Source code for coconut_tools.create_dat

"""
Module for generating EUHFORIA-compatible solar wind boundary files from COCONUT outputs.

This module processes CFmesh plasma simulation output, interpolates it onto a heliospheric boundary grid,
and formats the result into `.dat` files usable as input for heliospheric simulations like EUHFORIA.

Main tasks:
- Load and interpret CFmesh geometry and field data,
- Interpolate plasma and magnetic field variables onto a spherical grid,
- Optionally rotate the grid to match Carrington/Stonyhurst frames,
- Save results in the required EUHFORIA `.dat` format.

Typically used as a post-processing step after a COCONUT coronal simulation.
"""

import re
import glob
import os
import errno
import math
from datetime import datetime, timedelta
from typing import List, Tuple

import numpy as np
from scipy.interpolate import RBFInterpolator
from coconut_tools.rotation_angle import compute_rotation_angle
from coconut_tools.logger_config import setup_logger

logger = setup_logger(__name__)

[docs] def rsun_to_m(x: float) -> float: """Convert a distance expressed in solar radii to meters. The conversion assumes 1 astronomical unit equals 215 solar radii, which implies 1 Rsun = 149597870700 / 215 meters. Args: x (float): Distance in solar radii. Returns: float: Distance in meters. """ AU = 149597870700.0 RSUN_PER_AU = 215.0 return x * AU / RSUN_PER_AU
[docs] def readstruct(lines: List[str]) -> Tuple[int, int, int, int, int, int, List[Tuple[int, str]]]: """Reads the structure of a CFmesh file. Args: lines (List[str]): Lines of the CFmesh file. Returns: Tuple[int, int, int, int, int, int, List[Tuple[int, str]]]: Indices of node, state, element, outlet, number of elements, number of !END markers, and list of comments. """ exp = re.compile(r'!NB_ELEM (-?\d+)') nbelements, idx0, idx1, idx2, idx3, nend = 0, 0, 0, 0, 0, 0 comment = [] for i, line in enumerate(lines): if line.startswith("!"): comment.append((i, line)) if line.startswith("!LIST_NODE"): idx1 = i + 1 elif line.startswith("!LIST_STATE 1"): idx2 = i + 1 elif line.startswith("!LIST_ELEM"): idx0 = i + 1 elif line.startswith("!NB_ELEM "): nbelements = int(exp.search(line).group(1)) elif line.startswith("!TRS_NAME Outlet"): idx3 = i elif line.startswith("!END"): nend += 1 return idx0, idx1, idx2, idx3, nbelements, nend, comment
[docs] def extract_number(file_name: str) -> int: """Extracts the numerical index from a CFmesh filename. Args: file_name (str): The filename to extract the number from. Returns: int: Extracted number or 0 if not found. """ match = re.search(r'corona-iter_(\d+)\.CFmesh$', file_name) return int(match.group(1)) if match else 0
[docs] def create_boundary_fromcfmesh(inputfile: str, time: str, rad_out: float, nb_th: int, nb_phi: int, eps: float, output_dat: str, full_output: bool = True) -> None: """Creates a boundary file by interpolating CFmesh volume data onto a spherical grid. Args: inputfile (str): Path to the input CFmesh file. time (str): Timestamp string in ISO format. rad_out (float): Target radius for the boundary. ; (m, 0.1 AU by default) nb_th (int): Number of colatitude grid points. nb_phi (int): Number of longitude grid points. eps (float): Small angle offset to avoid poles. output_dat (str): Output file path. full_output (bool): Whether to include all variables or a reduced subset. """ if not os.path.isfile(inputfile): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), inputfile) with open(inputfile, "r") as f: lines = f.readlines() idx0, idx1, idx2, idx3, nbelements, nend, comment = readstruct(lines) connectivity = np.loadtxt(lines[idx0:idx0 + nbelements], dtype=int) coordinates = np.loadtxt(lines[idx1:idx2 - 1], dtype=float) nodes = connectivity[:, :6] centers = coordinates[nodes].mean(axis=1) r = np.linalg.norm(centers, axis=1) r_mask = (r > rad_out * 0.95) & (r < rad_out * 1.05) mask = np.where(r_mask)[0] LAT = np.linspace(eps, np.pi - eps, nb_th) LON = np.linspace(eps, 2.0 * np.pi - eps, nb_phi) lat, lon = np.meshgrid(LAT, LON, indexing='ij') r_def = rad_out x_sph = r_def * np.sin(lat) * np.cos(lon) y_sph = r_def * np.sin(lat) * np.sin(lon) z_sph = r_def * np.cos(lat) grid = np.column_stack((x_sph.flatten(), y_sph.flatten(), z_sph.flatten())) bd = comment[-nend - 1][0] + 1 bf = comment[-nend][0] Initialdata = np.loadtxt(lines[bd:bf], dtype=np.float64) rho0 = Initialdata[mask, 0] * 1.67e-13 / 1.67e-27 # n'=rho/mp=mu*n Vx0 = Initialdata[mask, 1] * 480248.0 #m/s Vy0 = Initialdata[mask, 2] * 480248.0 #m/s Vz0 = Initialdata[mask, 3] * 480248.0 #m/s Bx = Initialdata[mask, 4] * 2.2e-4 #T By = Initialdata[mask, 5] * 2.2e-4 #T Bz = Initialdata[mask, 6] * 2.2e-4 #T Pressure = Initialdata[mask, 7] * 0.03851 # Pa temp = Pressure * 1.27 / rho0 / 2 / 1.38e-23 # P*mu/n'/kb = T [K] with mu=0.5 already hardcoded here !!! x, y, z = centers[mask].T rho = np.hypot(x, y) rho_safe = np.maximum(rho, 1e-8) r_safe = np.maximum(r[mask], 1e-8) vclt = (z*(x*Vx0 + y*Vy0) - (rho**2)*Vz0) / (r_safe * rho_safe) bclt = (z*(x*Bx + y*By ) - (rho**2)*Bz ) / (r_safe * rho_safe) vr = (x * Vx0 + y * Vy0 + z * Vz0) / r_safe vlon = (-y * Vx0 + x * Vy0) / rho_safe br = (x * Bx + y * By + z * Bz) /r_safe blon = (-y * Bx + x * By) / rho_safe interp_fields = { 'vr': vr, 'vp': vlon, 'vt': vclt, 'number_density': 2 * rho0 / 1.27, 'temperature': temp, 'Br': br, 'Bp': blon, 'Bt': bclt } selected_keys = ['vr', 'vp', 'vt', 'number_density', 'temperature', 'Br', 'Bp', 'Bt'] if full_output else ['vr', 'number_density', 'temperature', 'Br'] coords = np.column_stack((x, y, z)) coords_unique, indices_unique = np.unique(coords, axis=0, return_index=True) interpolated = {} for key in selected_keys: interpolator = RBFInterpolator(coords_unique, interp_fields[key][indices_unique], kernel='linear', neighbors=50) interp_vals = interpolator(grid).reshape(nb_th, nb_phi) interpolated[key] = interp_vals with open(output_dat, 'w') as f: f.write(f'Time:\n{time}\nRadius of sphere:\n{str(rsun_to_m(rad_out))}\n') f.write(f'Number of colatitude grid points:\n{nb_th}\nColatitude grid points:\n') f.write('\n'.join(f'{v:.19e}' for v in LAT) + '\n') f.write(f'Number of longitude grid points:\n{nb_phi}\nLongitude grid points:\n') f.write('\n'.join(f'{v:.19e}' for v in LON) + '\n') for key in selected_keys: f.write(f'{key}\n') f.write('\n'.join( '\n'.join(f'{interpolated[key][j, k]:.19e}' for j in range(nb_th)) for k in range(nb_phi)) + '\n')
[docs] def rotation(input_dat: str, output_dat: str, angle_degrees: float, full_output: bool = True, rad_out: float=21.5) -> None: """Rotates the longitude data in a boundary file and adjusts selected fields accordingly. Args: input_dat (str): Path to the input data file. output_dat (str): Path to the output data file. angle_degrees (float): Rotation angle in degrees. full_output (bool): Whether to include all variables (True) or only vr, number_density, temperature, Br (False). """ with open(input_dat) as f: lines = f.read().splitlines() date = lines[1] clt_start = lines.index('Colatitude grid points:') lon_start = lines.index('Longitude grid points:') full_keys = ['vr', 'vp', 'vt', 'number_density', 'temperature', 'Br', 'Bp', 'Bt'] reduced_keys = ['vr', 'number_density', 'temperature', 'Br'] keys = full_keys if full_output else reduced_keys indices = {key: lines.index(key) for key in keys} clt = [float(x) for x in lines[clt_start + 1:lon_start - 2]] lon_before = [float(x) - np.radians(angle_degrees) for x in lines[lon_start + 1:indices['vr']]] lon = list(np.mod(lon_before, 2.0 * math.pi)) min_index = np.argmin(lon) lon = np.roll(lon, -min_index) def shift_data(key): next_key_index = indices[keys[keys.index(key) + 1]] if key != keys[-1] else None data = [float(x) for x in lines[indices[key] + 1:next_key_index]] if next_key_index else [float(x) for x in lines[indices[key] + 1:]] return np.roll(data, -min_index * len(clt)) data_arrays = {key: shift_data(key) for key in keys} with open(output_dat, 'w') as fp: fp.write(f"Time:\n{date}\nRadius of sphere:\n{str(rsun_to_m(rad_out))}\n") fp.write(f"Number of colatitude grid points:\n{len(clt)}\nColatitude grid points:\n") np.savetxt(fp, clt) fp.write(f"Number of longitude grid points:\n{len(lon)}\nLongitude grid points:\n") np.savetxt(fp, lon) for name in keys: fp.write(f"{name}\n") np.savetxt(fp, data_arrays[name])
if __name__ == "__main__": """ Example usage of the module to process a series of CFmesh files and generate boundary .dat files. """ magnetogram_path = r"E:/COCONUT/2017/magnetogram/hmi.Synoptic_Mr_polfil.2194.fits" date_hmi = '2017-09-04T18:00:00' auto_compute_rotation = True manual_rotation_angle = 0.0 # fallback if auto_compute_rotation is False angle, date_dt = compute_rotation_angle(magnetogram_path, date_hmi) first_time = date_dt.strftime("%Y-%m-%dT%H:%M:%S") rad_out = 21.5 nb_th = 180 nb_phi = 360 eps = 0.01 full_output = True # <--- change this to True for full variable set #angle = manual_rotation_angle logger.info(f"Computed rotation angle: {angle} degrees") logger.info(f"Using date for rotation computation: {date_dt.isoformat()}\n") files = glob.glob('E:/COCONUT/2017/cme/CFmesh/corona-iter_*.CFmesh') files = sorted(files, key=extract_number) for i, file in enumerate(files): logger.info(f"Processing file {i}: {file}") date_dt = datetime.strptime(first_time, '%Y-%m-%dT%H:%M:%S') date_i = date_dt + timedelta(hours=20 * i * 0.005 * 0.402) #date_i = date_dt + timedelta(seconds=i*145) timestamp_i = str(int(date_i.timestamp())) output_dat_temp = f'E:/COCONUT/2017/cme/dat/solar_wind_boundary_{timestamp_i}_temps.dat' output_dat = f'E:/COCONUT/2017/cme/dat/solar_wind_boundary_{timestamp_i}.dat' time = date_i.strftime("%Y-%m-%dT%H:%M:%S") if os.path.exists(output_dat): logger.info(f"Output already exists, skipping: {output_dat}") continue create_boundary_fromcfmesh(file, time, rad_out, nb_th, nb_phi, eps, output_dat_temp, full_output=full_output) rotation(output_dat_temp, output_dat, angle, full_output=full_output) os.remove(output_dat_temp)