Source code for pyPLUTO.toolfuncs.findlines

from typing import Any

import contourpy as cp
import matplotlib.colors as mcol
import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import NDArray
from scipy.integrate import solve_ivp

from ..h_pypluto import check_par, makelist


def _check_var(self, var: str | NDArray, transpose: bool = False) -> np.ndarray:
    """Function that checks returns a variable. If the variable is a
    numpy array, it is simply returned (and the transpose is taken into
    account). If the variable is a string, the variable is retrieved
    from the dataset. If the variable is not found, an error is raised.

    Returns
    -------
    - var: np.ndarray
        The variable.

    Parameters
    ----------
    - var (not optional): str | np.ndarray
        The variable to be checked.
    - transpose: bool, default False
        If True, the variable is transposed.

    ----

    Examples
    --------
    - Example #1: var is a numpy array

        >>> var = np.array([1, 2, 3])
        >>> D._check_var(var, False)
        array([1, 2, 3])

    - Example #2: var is a string

        >>> D = pp.Load()
        >>> D._check_var("Bx1", False)
        D.Bx1

    """
    # If var is a string the code tries to recover it from the class attributes,
    # if failed it raises an error
    if isinstance(var, str):
        try:
            var = getattr(self, var)
        except ValueError:
            raise ValueError(f"Variable {var} not found in the dataset.")

    # If the transpose keyword is True, the variable is transposed.
    if transpose is True:
        var = var.T

    # Return the variable
    return var


def _vector_field(
    t: float,
    y: np.ndarray,
    var1: np.ndarray,
    var2: np.ndarray,
    xc: np.ndarray,
    yc: np.ndarray,
) -> list[np.ndarray]:
    """Compute the vector field at the given time and coordinates by
    interpolating the variables var1 and var2 at the given coordinates.
    The interpolation is made through the routine np.interpolate.

    Returns
    -------
    - [qx, qy]: list[np.ndarray]
        The x1 and x2 vector field components, within a list

    Parameters
    ----------
    - t (not optional): float
        The time variable (not used here).
    - var1 (not optional): np.ndarray
        The first variable to be interpolated.
    - var2 (not optional): np.ndarray
        The second variable to be interpolated.
    - xc (not optional): np.ndarray
        The x coordinates of the grid.
    - y (not optional): np.ndarray
        The coordinates. The first and second dimension are x and y.
    - yc (not optional): np.ndarray
        The y coordinates of the grid.

    ----

    Examples
    --------
    - Example #1: Compute the vector field at the given time and coordinates

        >>> vector_field(t, y, var1, var2, xc, yc)

    """
    # Get the coordinates
    x, y = y

    # Compute indices for closest grid points in xc and yc
    i0 = np.abs(x - xc).argmin()
    j0 = np.abs(y - yc).argmin()

    # Interpolate U and V at the given coordinates
    scrhUx = np.interp(x, xc, var1[:, j0])
    scrhUy = np.interp(y, yc, var1[i0])
    scrhVx = np.interp(x, xc, var2[:, j0])
    scrhVy = np.interp(y, yc, var2[i0])

    # Compute the resulting vector field components
    qx = scrhUx + scrhUy - var1[i0, j0]
    qy = scrhVx + scrhVy - var2[i0, j0]

    # Return the vector field
    return [qx, qy]


[docs] def find_fieldlines( self, var1: str | np.ndarray, var2: str | np.ndarray, x0: list | float | None = None, y0: list | float | None = None, text: bool = False, check: bool = True, **kwargs: Any, ) -> list: """Find field lines using the vector field. The field lines are computed by interpolating the variables var1 and var2 at the footpoints x0 and y0. Different integration algorithms are available, based on the method solve_ivp of the scipy package. Returns ------- - linelist: list A list of lists containing the coordinates of the field lines. The strcuture of the list is [[x1, y1], [x2, y2], ...] where x1, y1, x2, y2 are numpy arrays representing the coordinates of the field lines. Parameters ---------- - atol: float, default 1e-6 The absolute tolerance for the integration. - close: bool, default True If True, it checks if the line is closed on itself. - ctol: float, default 1e-6 The absolute tolerance for line closing on itself. - dense: bool, default False If True, the grid is dense (dense=True) or sparse (dense=False). - maxstep: float, default 100*step The maximum step size for the integration. - minstep: float, default 0.05*step The minimum step size for the integration (only used if order is LSODA). - numsteps: int, default 16384 The maximum number of steps for the integration. - order: str, default 'RK45' The integration method. Available options are: 'RK45', 'RK23', 'DOP853', 'Radau', 'BDF', 'LSODA'. - rtol: float, default 1e-6 The relative tolerance for the integration. - step: float, default abs(min((xend-xbeg)/self.nx1, (yend-ybeg)/self.nx2)) The initial step size for the integration. - text: bool, default False If True some additional information is printed. - transpose: bool, default False If True, the variables are transposed. - var1 (not optional): str | NDArray The first variable to be interpolated. - var2 (not optional): str | NDArray The second variable to be interpolated. - x0: list The x coordinates of the footpoints. - x1: NDArray | list | None, default self.x1 The x coordinates of the grid. - x2: NDArray | list | None, default self.x2 The y coordinates of the grid. - y0: list The y coordinates of the footpoints. ---- Examples -------- - Example #1: Find field lines using the vector field >>> find_fieldlines(var1, var2, x0, y0) - Example #2: Find field lines using two strings 'Bx1' and 'Bx2' >>> find_fieldlines("Bx1", "Bx2", x0, y0) - Example #3: Find field lines using two variables and two footpoints >>> find_fieldlines(var1, var2, [x1, x2], [y1, y2]) """ # Check parameters param = { "atol", "close", "ctol", "dense", "maxstep", "minstep", "numsteps", "order", "rtol", "step", "transpose", "x1", "x2", } if check is True: check_par(param, "find_fieldlines", **kwargs) # Get the variable, if it is a string, get the variable from the dataset. # The .T is used to transpose the variable to the correct shape. varx = self._check_var(var1, kwargs.get("transpose", False)) vary = self._check_var(var2, kwargs.get("transpose", False)) # Get the grid information xc = kwargs.get("x1", self.x1) yc = kwargs.get("x2", self.x2) # Check if the grid is uniform # if not np.all(np.diff(xc) == np.diff(xc)[0]): # err = "The grid is not uniform. Only uniform grids are supported." # raise ValueError(err) # Get the footpoints if x0 is None or y0 is None: raise ValueError("Footpoints not provided. Please provide footpoints!") # Make sure x0 and y0 are lists x0 = makelist(x0) y0 = makelist(y0) # Get the domain size (Take the initial and final coordinates # slightly larger to allow a seed to be specified on the boundary). xbeg = xc[0] - 0.51 * (xc[1] - xc[0]) xend = xc[-1] + 0.51 * (xc[-1] - xc[-2]) ybeg = yc[0] - 0.51 * (yc[1] - yc[0]) yend = yc[-1] + 0.51 * (yc[-1] - yc[-2]) # Set the keywords rtol = kwargs.get("rtol", 1.0e-3) atol = kwargs.get("atol", 1.0e-6) ctol = kwargs.get("ctol", 1.0e-6) order = kwargs.get("order", "RK45") dense = kwargs.get("dense", False) # Set the initial step size and maximum number of steps step = np.abs( kwargs.get( "step", min((xend - xbeg) / self.nx1, (yend - ybeg) / self.nx2) ) ) maxstep = kwargs.get("maxstep", 100 * step) numstep = int(kwargs.get("numsteps", 16384)) tfin = maxstep * numstep # Define the system of differential equations def system(t, y): """System of differential equations for the field lines.""" return _vector_field(t, y, varx, vary, xc, yc) # Event to detect if the field line exits the domain def outside_domain(t, y): """Event to detect if the field line exits the domain.""" if y[0] < xbeg or y[0] > xend or y[1] < ybeg or y[1] > yend: return 0 # Trigger event (exiting the domain) return 1 # Do not trigger event (still within the domain) # Event to detect if the field line closes on itself def close_to_start(t, y): """Event to detect if the field line closes on itself.""" dist_0 = np.linalg.norm(y - np.asarray(self.init_pos)) if dist_0 < ctol and t > maxstep: self.loop_dom = True return 0 # Trigger event (closing on itself) self.oldpos = y return 1 # Do not trigger event (open line) # Event to detect if the maximum number of steps is reached def max_num_steps(t, y): """Event to detect if the maximum number of steps is reached.""" self.stepnum += 1 if self.stepnum > numstep: return 0 # Trigger event (maximum number of steps reached) return 1 # Do not trigger event (still below maximum step number) # Set the events to be triggered close_to_start.terminal = ( True if kwargs.get("close", True) is True else False ) close_to_start.direction = 0 outside_domain.terminal = True outside_domain.direction = 0 max_num_steps.terminal = True max_num_steps.direction = 0 # Initilaize the list of lines and set the keywords lines_list = [] linekwargs = {} if order == "LSODA": linekwargs["minstep"] = kwargs.get("minstep", 0.05 * step) # Iterate on the footpoints for ind, xp in enumerate(x0): # Set the initial conditions self.loop_dom = False yp = y0[ind] self.init_pos = [xp, yp] self.oldpos = [xp, yp] self.stepnum = 0 t_span = (0, tfin) # Integrate forward sol_forward = solve_ivp( system, t_span, [xp, yp], method=order, events=[outside_domain, max_num_steps, close_to_start], rtol=rtol, atol=atol, max_step=maxstep, first_step=step, dense_output=dense, **linekwargs, ) # If the line did not close on itself, integrate backward numstep = 0 if self.loop_dom is True else numstep # Set the new conditions forw_steps = self.stepnum self.init_pos = [sol_forward.y.T[:, 0][-1], sol_forward.y.T[:, 1][-1]] self.stepnum = 0 t_span = (0, -tfin) # Integrate backward sol_backward = solve_ivp( system, t_span, [xp, yp], method=order, events=[outside_domain, max_num_steps, close_to_start], rtol=rtol, atol=atol, max_step=maxstep, first_step=step, dense_output=dense, **linekwargs, ) # Concatenate the solutions (backward and forward) x_line = np.vstack((sol_backward.y.T[::-1], sol_forward.y.T))[:, 0] y_line = np.vstack((sol_backward.y.T[::-1], sol_forward.y.T))[:, 1] # If the line is closed on itself, close the line if self.loop_dom is True: x_line = np.append(x_line, x_line[0]) y_line = np.append(y_line, y_line[0]) # Print the time of the integration if text is True: print("Line with footpoint at x = ", xp, " and y = ", yp) print("Final integration time forward: ", sol_forward.t[-1]) print("Final integration time backward: ", sol_backward.t[-1]) print("Final step number forward: ", forw_steps) print("Final step number backward: ", self.stepnum) # Add the line to the list (if it has more than one point) lines_list.append([x_line, y_line]) if len(x_line) > 1 else None # Delete the methods that are not needed for method_name in ["init_pos", "stepnum", "out_dom", "oldpos"]: if method_name in self.__class__.__dict__: delattr(self.__class__, method_name) # Return the list of lines return lines_list
[docs] def find_contour( self, var: str | np.ndarray, check: bool = True, **kwargs: Any ) -> list: """Generate contour lines for a given variable. Returns ------- - lines_list: list List of contour lines. The strcuture of the list is [[x1, y1], [x2, y2], ...] where x1, y1, x2, y2 are numpy arrays representing the coordinates of the field lines. Parameters ---------- - cmap: str, default 'k' The colormap to use to associate each level with a color. The colormap can also be a color, which is used for all the levels. If not provided, all the lines are associated with the color black. - levels: int | np.ndarray, default 10 The levels of number of levels or the list of levels for the contours. If an integer is provided, the levels are generated using a linear or logarithmic scale. If an array is provided, the levels are taken from the array. - levelscale: str, default 'linear' The scale of the levels. Available options are 'linear' and 'logarithmic'. - transpose: bool, default False If True, the variable is transposed. - var (not optional): str | np.ndarray The variable to plot. If a string is provided, the variable is taken from the dataset. - vmax: float, default np.max(var) The maximum value of the variable. - vmin: float, default np.min(var) The minimum value of the variable. - x1: np.ndarray, default self.x1 The x1 coordinates. If the geometry is non-Cartesian, the x1 cartesian coordinates are taken from the dataset. - x2: np.ndarray, default self.x2 The x2 coordinates. If the geometry is non-Cartesian, the x2 cartesian coordinates are taken from the dataset. ---- Examples -------- - Example #1: Generate contour lines for a given variable. >>> lines_list = find_contour(var) - Example #2: Generate contour lines for a given variable and coordinates. >>> lines_list = find_contour(var, x1=x1, x2=x2) - Example #3: Generate contour lines for a given variable and coordinates with a logarithmic scale. >>> lines_list = find_contour(var, x1=x1, x2=x2, >>> ... levelscale='logarithmic') - Example #4: Generate contour lines for a given variable and coordinates with a logarithmic scale and a colormap. >>> lines_list = find_contour(var, x1=x1, x2=x2, >>> ... levelscale='logarithmic', cmap='jet') """ # Check parameters param = { "cmap", "levels", "levelscale", "transpose", "vmax", "vmin", "x1", "x2", } if check is True: check_par(param, "find_contour", **kwargs) # Get the variable, if it is a string, get the variable from the dataset. # The .T is used to transpose the variable to the correct shape. var = self._check_var(var, kwargs.get("transpose", False)).T # Get the grid information and provide a default value for the coordinates # if they are not provided depending on the geometry. if self.geom == "SPHERICAL": x1 = self.x1p x2 = self.x2p elif self.geom == "POLAR" and self.nx2 == 1: x1 = self.x1 x2 = self.x3 elif self.geom == "POLAR": x1 = self.x1c x2 = self.x2c else: x1 = self.x1 x2 = self.x2 # Get the coordinates from the keyword arguments (if provided) x1 = kwargs.get("x1", x1) x2 = kwargs.get("x2", x2) # Get the variable information (minimum and maximum values) vmin = kwargs.get("vmin", np.nanmin(var)) vmax = kwargs.get("vmax", np.nanmax(var)) # Compute the levels of the contours, in linear or logarithmic scale levels = kwargs.get("levels", 10) levelscale = kwargs.get("levelscale", "linear") # If levels is an integer, the levels are computed in lin or log scale if isinstance(levels, int): levels = ( np.linspace(vmin, vmax, levels) if levelscale == "linear" else np.logspace(np.log10(vmin), np.log10(vmax), levels) ) # If levels is a float, convert it to a list if isinstance(levels, float): levels = [levels] # Set colormap (try to get it from the colormap list, if not then use # the color provided), if not provided use black. if "cmap" in kwargs: cmap_val = kwargs.get("cmap") try: cmap = plt.get_cmap(cmap_val) except (ValueError, TypeError): cmap = mcol.ListedColormap(cmap_val) else: cmap = mcol.ListedColormap(["k"]) # Initialize the list of lines lines_list = [] # Get the contour generator and the lines for every level cont_gen = cp.contour_generator(x1, x2, var, name="serial") for indx, level in enumerate(levels): contour = cont_gen.lines(level) for line in contour: x_c = line[:, 0] y_c = line[:, 1] col = cmap(indx / (len(levels) - 1)) if "cmap" in kwargs else "k" lines_list.append([x_c, y_c, col]) if len(line) > 1 else None # Return the list of lines return lines_list