Source code for pyPLUTO.toolfuncs.findlines

"""Find-lines utilities manager."""

from __future__ import annotations

import logging
from typing import Unpack

import contourpy as cp
import matplotlib.colors as mcol
import matplotlib.pyplot as plt
import numpy as np
from scipy.integrate import solve_ivp

from pyPLUTO.loadkwargs import FindContourKwargs, FindFieldlinesKwargs
from pyPLUTO.loadmixin import LoadMixin
from pyPLUTO.loadstate import LoadState
from pyPLUTO.toolfuncs.loadtools import LoadToolsManager
from pyPLUTO.utils.inspector import track_kwargs

logger = logging.getLogger(__name__)


class FindLinesManager(LoadMixin):
    """Manager for field-line and contour helpers."""

    def __init__(self, state: LoadState) -> None:
        """Initialize the field-line manager and the underlying tools manager.

        Parameters
        ----------
        - state: LoadState
            The load state object providing grid arrays and dataset variables.

        Returns
        -------
        - None

        """
        self.state = state
        self.LoadToolsManager = LoadToolsManager(state)

    @staticmethod
    def _vector_field(
        _t: float,
        y: np.ndarray,
        var1: np.ndarray,
        var2: np.ndarray,
        xc: np.ndarray,
        yc: np.ndarray,
    ) -> list[np.ndarray]:
        """Compute vector field interpolation at position y.

        Parameters
        ----------
        - _t: float
            The time.
        - y: np.ndarray
            The position.
        - var1: np.ndarray
            The first vector component.
        - var2: np.ndarray
            The second vector component.
        - xc: np.ndarray
            The x-coordinates of the grid.
        - yc: np.ndarray
            The y-coordinates of the grid.

        Returns
        -------
        - list[np.ndarray]
            The interpolated vector field.

        """
        x, y = y

        i0 = np.abs(x - xc).argmin()
        j0 = np.abs(y - yc).argmin()

        scrh_ux = np.interp(x, xc, var1[:, j0])
        scrh_uy = np.interp(y, yc, var1[i0])
        scrh_vx = np.interp(x, xc, var2[:, j0])
        scrh_vy = np.interp(y, yc, var2[i0])

        qx = scrh_ux + scrh_uy - var1[i0, j0]
        qy = scrh_vx + scrh_vy - var2[i0, j0]

        return [qx, qy]

[docs] @track_kwargs def find_fieldlines( self, var1: str | np.ndarray, var2: str | np.ndarray, x0: list | float | None = None, y0: list | float | None = None, x1: np.ndarray | None = None, x2: np.ndarray | None = None, text: bool = False, _check: bool = True, **kwargs: Unpack[FindFieldlinesKwargs], ) -> list: """Find field lines using the vector field. The field lines are computed by interpolating the variables var1 and var2 at the footpoints x0 and y0. Different integration algorithms are available, based on the method solve_ivp of the scipy package. Parameters ---------- - atol: float, default 1e-6 The absolute tolerance for the integration. - closed: bool, default True If True, it checks if the line is closed on itself. - ctol: float, default 1e-6 The absolute tolerance for line closing on itself. - dense: bool, default False If True, the grid is dense (dense=True) or sparse (dense=False). - maxstep: float, default 100*step The maximum step size for the integration. - minstep: float, default 0.05*step The minimum step size for the integration (only used if order is LSODA). - numsteps: int, default 16384 The maximum number of steps for the integration. - order: str, default 'RK45' The integration method. Available options are: 'RK45', 'RK23', 'DOP853', 'Radau', 'BDF', 'LSODA'. - rtol: float, default 1e-6 The relative tolerance for the integration. - step: float, default abs(min((xend-xbeg)/self.nx1, (yend-ybeg)/self.nx2)) The initial step size for the integration. - text: bool, default False If True some additional information is printed. - transpose: True/False, default False Transposes the variable matrix. Use is not recommended if not really necessary (e.g. in case of highly customized variables and plots). - var1 (not optional): str | np.ndarray The first variable to be interpolated. - var2 (not optional): str | np.ndarray The second variable to be interpolated. - x0: list The x coordinates of the footpoints. - x1: np.ndarray | list | None, default self.x1 The x coordinates of the grid. - x2: np.ndarray | list | None, default self.x2 The y coordinates of the grid. - y0: list The y coordinates of the footpoints. Returns ------- - linelist: list A list of lists containing the coordinates of the field lines. The strcuture of the list is [[x1, y1], [x2, y2], ...] where x1, y1, x2, y2 are numpy arrays representing the coordinates of the field lines. Examples -------- - Example #1: Find field lines using the vector field >>> find_fieldlines(var1, var2, x0, y0) - Example #2: Find field lines using two strings 'Bx1' and 'Bx2' >>> find_fieldlines("Bx1", "Bx2", x0, y0) - Example #3: Find field lines using two variables and two footpoints >>> find_fieldlines(var1, var2, [x1, x2], [y1, y2]) """ varx = self.LoadToolsManager.check_var( var1, kwargs.get("transpose", False), ) vary = self.LoadToolsManager.check_var( var2, kwargs.get("transpose", False), ) xc = x1 if x1 is not None else self.x1 yc = x2 if x2 is not None else self.x2 if x0 is None or y0 is None: raise ValueError( "Footpoints not provided. Please provide footpoints!", ) x0 = list(np.atleast_1d(x0)) y0 = list(np.atleast_1d(y0)) xbeg = xc[0] - 0.51 * (xc[1] - xc[0]) xend = xc[-1] + 0.51 * (xc[-1] - xc[-2]) ybeg = yc[0] - 0.51 * (yc[1] - yc[0]) yend = yc[-1] + 0.51 * (yc[-1] - yc[-2]) rtol = kwargs.get("rtol", 1.0e-3) atol = kwargs.get("atol", 1.0e-6) ctol = kwargs.get("ctol", 1.0e-6) order = kwargs.get("order", "RK45") dense = kwargs.get("dense", False) step = np.abs( kwargs.get( "step", min((xend - xbeg) / self.nx1, (yend - ybeg) / self.nx2), ), ) maxstep = kwargs.get("maxstep", 100 * step) numstep = int(kwargs.get("numsteps", 16384)) tfin = maxstep * numstep def system(t: float, y: np.ndarray) -> list[np.ndarray]: """Evaluate the ODE rhs by interpolating the vector field.""" return self._vector_field(t, y, varx, vary, xc, yc) def outside_domain(t: float, y: np.ndarray) -> float: """Return 0 (terminal) when the integrator leaves the domain.""" if y[0] < xbeg or y[0] > xend or y[1] < ybeg or y[1] > yend: return 0 return 1 def close_to_start(t: float, y: np.ndarray) -> float: """Return 0 when the integrator returns near the seed point.""" dist_0 = np.linalg.norm(y - np.asarray(self.init_pos)) if dist_0 < ctol and t > maxstep: self.loop_dom = True return 0 self.oldpos = list(y) return 1 def max_num_steps(t: float, y: np.ndarray) -> float: """Return 0 when the allowed step count is exceeded.""" self.stepnum += 1 if self.stepnum > numstep: return 0 return 1 close_to_start.terminal = kwargs.get("closed", True) is True # type: ignore close_to_start.direction = 0 # type: ignore outside_domain.terminal = True # type: ignore outside_domain.direction = 0 # type: ignore max_num_steps.terminal = True # type: ignore max_num_steps.direction = 0 # type: ignore lines_list = [] linekwargs = {} if order == "LSODA": linekwargs["minstep"] = kwargs.get("minstep", 0.05 * step) for ind, xp in enumerate(x0): self.loop_dom = False yp = y0[ind] self.init_pos = [xp, yp] self.oldpos = [xp, yp] self.stepnum = 0 t_span = (0, tfin) sol_forward = solve_ivp( system, t_span, [xp, yp], method=order, events=[outside_domain, max_num_steps, close_to_start], rtol=rtol, atol=atol, max_step=maxstep, first_step=step, dense_output=dense, **linekwargs, ) numstep = 0 if self.loop_dom is True else numstep forw_steps = self.stepnum self.init_pos = [ sol_forward.y.T[:, 0][-1], sol_forward.y.T[:, 1][-1], ] self.stepnum = 0 t_span = (0, -tfin) sol_backward = solve_ivp( system, t_span, [xp, yp], method=order, events=[outside_domain, max_num_steps, close_to_start], rtol=rtol, atol=atol, max_step=maxstep, first_step=step, dense_output=dense, **linekwargs, ) x_line = np.vstack((sol_backward.y.T[::-1], sol_forward.y.T))[:, 0] y_line = np.vstack((sol_backward.y.T[::-1], sol_forward.y.T))[:, 1] if self.loop_dom is True: x_line = np.append(x_line, x_line[0]) y_line = np.append(y_line, y_line[0]) if text is True: logger.debug("Line with footpoint at x = %s and y = %s", xp, yp) logger.debug( "Final integration time forward: %s", sol_forward.t[-1], ) logger.debug( "Final integration time backward: %s", sol_backward.t[-1], ) logger.debug("Final step number forward: %s", forw_steps) logger.debug( "Final step number backward: %s", self.stepnum, ) if len(x_line) > 1: lines_list.append([x_line, y_line]) for method_name in ["init_pos", "stepnum", "out_dom", "oldpos"]: if method_name in self.__class__.__dict__: delattr(self.__class__, method_name) return lines_list
[docs] @track_kwargs def find_contour( self, var: str | np.ndarray, _check: bool = True, **kwargs: Unpack[FindContourKwargs], ) -> list: """Generate contour lines for a given variable. Parameters ---------- - levels: int | np.ndarray, default 10 The levels of number of levels or the list of levels for the contours. If an integer is provided, the levels are generated using a linear or logarithmic scale. If an array is provided, the levels are taken from the array. - levelscale: str, default 'linear' The scale of the levels. Available options are 'linear' and 'logarithmic'. - line_cmap: str, default 'k' The colormap to use to associate each level with a color. The colormap can also be a color, which is used for all the levels. If not provided, all the lines are associated with the color black. - transpose: True/False, default False Transposes the variable matrix. Use is not recommended if not really necessary (e.g. in case of highly customized variables and plots). - var (not optional): str | np.ndarray The variable to plot. If a string is provided, the variable is taken from the dataset. - vmax: float The maximum value of the variable to be computed / plotted. - vmin: float The minimum value of the variable to be computed / plotted. - x1: np.ndarray, default self.x1 The x1 coordinates. If the geometry is non-Cartesian, the x1 cartesian coordinates are taken from the dataset. - x2: np.ndarray, default self.x2 The x2 coordinates. If the geometry is non-Cartesian, the x2 cartesian coordinates are taken from the dataset. Returns ------- - lines_list: list List of contour lines. The strcuture of the list is [[x1, y1], [x2, y2], ...] where x1, y1, x2, y2 are numpy arrays representing the coordinates of the field lines. Examples -------- - Example #1: Generate contour lines for a given variable. >>> lines_list = find_contour(var) - Example #2: Generate contour lines for a given variable and coordinates. >>> lines_list = find_contour(var, x1=x1, x2=x2) - Example #3: Generate contour lines for a given variable and coordinates with a logarithmic scale. >>> lines_list = find_contour(var, x1=x1, x2=x2, >>> ... levelscale='logarithmic') - Example #4: Generate contour lines for a given variable and coordinates with a logarithmic scale and a colormap. >>> lines_list = find_contour(var, x1=x1, x2=x2, >>> ... levelscale='logarithmic', line_cmap='jet') """ var = self.LoadToolsManager.check_var( var, kwargs.get("transpose", False), ).T if self.geom == "SPHERICAL": x1 = self.x1p x2 = self.x2p elif self.geom == "POLAR" and self.nx2 == 1: x1 = self.x1 x2 = self.x3 elif self.geom == "POLAR": x1 = self.x1c x2 = self.x2c else: x1 = self.x1 x2 = self.x2 x1 = kwargs.get("x1", x1) x2 = kwargs.get("x2", x2) vmin = kwargs.get("vmin", np.nanmin(var)) vmax = kwargs.get("vmax", np.nanmax(var)) levels = kwargs.get("levels", 10) levelscale = kwargs.get("levelscale", "linear") if isinstance(levels, int): levels = ( np.linspace(vmin, vmax, levels) if levelscale == "linear" else np.logspace(np.log10(vmin), np.log10(vmax), levels) ) if isinstance(levels, float): levels = [levels] if "line_cmap" in kwargs: cmap_val = kwargs.get("line_cmap") if cmap_val is None: cmap = mcol.ListedColormap(["k"]) else: try: cmap = plt.get_cmap(cmap_val) except (ValueError, TypeError): cmap = mcol.ListedColormap(cmap_val) else: cmap = mcol.ListedColormap(["k"]) lines_list = [] cont_gen = cp.contour_generator(x1, x2, var, name="serial") for indx, level in enumerate(levels): contour = cont_gen.lines(level) for line in contour: line_arr = np.asarray(line) x_c = line_arr[:, 0] y_c = line_arr[:, 1] col = ( cmap(indx / (len(levels) - 1)) if "line_cmap" in kwargs else "k" ) if len(line_arr) > 1: lines_list.append([x_c, y_c, col]) return lines_list