Source code for pybamm.solvers.processed_variable

#
# Processed Variable class
#
import casadi
import numpy as np
import pybamm
from scipy.integrate import cumulative_trapezoid
import xarray as xr


[docs] class ProcessedVariable: """ An object that can be evaluated at arbitrary (scalars or vectors) t and x, and returns the (interpolated) value of the base variable at that t and x. Parameters ---------- base_variables : list of :class:`pybamm.Symbol` A list of base variables with a method `evaluate(t,y)`, each entry of which returns the value of that variable for that particular sub-solution. A Solution can be comprised of sub-solutions which are the solutions of different models. Note that this can be any kind of node in the expression tree, not just a :class:`pybamm.Variable`. When evaluated, returns an array of size (m,n) base_variable_casadis : list of :class:`casadi.Function` A list of casadi functions. When evaluated, returns the same thing as `base_Variable.evaluate` (but more efficiently). solution : :class:`pybamm.Solution` The solution object to be used to create the processed variables warn : bool, optional Whether to raise warnings when trying to evaluate time and length scales. Default is True. """ def __init__( self, base_variables, base_variables_casadi, solution, warn=True, cumtrapz_ic=None, ): self.base_variables = base_variables self.base_variables_casadi = base_variables_casadi self.all_ts = solution.all_ts self.all_ys = solution.all_ys self.all_inputs = solution.all_inputs self.all_inputs_casadi = solution.all_inputs_casadi self.mesh = base_variables[0].mesh self.domain = base_variables[0].domain self.domains = base_variables[0].domains self.warn = warn self.cumtrapz_ic = cumtrapz_ic # Process spatial variables geometry = solution.all_models[0].geometry self.spatial_variables = {} for domain_level, domain_names in self.domains.items(): variables = [] for domain in domain_names: variables += list(geometry[domain].keys()) self.spatial_variables[domain_level] = variables # Sensitivity starts off uninitialized, only set when called self._sensitivities = None self.solution_sensitivities = solution.sensitivities # Store time self.t_pts = solution.t # Evaluate base variable at initial time self.base_eval_shape = self.base_variables[0].shape self.base_eval_size = self.base_variables[0].size # handle 2D (in space) finite element variables differently if ( self.mesh and "current collector" in self.domain and isinstance(self.mesh, pybamm.ScikitSubMesh2D) ): self.initialise_2D_scikit_fem() # check variable shape else: if len(self.base_eval_shape) == 0 or self.base_eval_shape[0] == 1: self.initialise_0D() else: n = self.mesh.npts base_shape = self.base_eval_shape[0] # Try some shapes that could make the variable a 1D variable if base_shape in [n, n + 1]: self.initialise_1D() else: # Try some shapes that could make the variable a 2D variable first_dim_nodes = self.mesh.nodes first_dim_edges = self.mesh.edges second_dim_pts = self.base_variables[0].secondary_mesh.nodes if self.base_eval_size // len(second_dim_pts) in [ len(first_dim_nodes), len(first_dim_edges), ]: self.initialise_2D() else: # Raise error for 3D variable raise NotImplementedError( f"Shape not recognized for {base_variables[0]}" + "(note processing of 3D variables is not yet implemented)" ) # xr_data_array is initialized when needed self._xr_data_array = None def initialise_0D(self): # initialise empty array of the correct size entries = np.empty(len(self.t_pts)) idx = 0 # Evaluate the base_variable index-by-index for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[idx] = float(base_var_casadi(t, y, inputs)) idx += 1 if self.cumtrapz_ic is not None: entries = cumulative_trapezoid( entries, self.t_pts, initial=float(self.cumtrapz_ic) ) # save attributes for interpolation self.entries_for_interp = entries self.coords_for_interp = {"t": self.t_pts} self.entries = entries self.dimensions = 0 def initialise_1D(self, fixed_t=False): len_space = self.base_eval_shape[0] entries = np.empty((len_space, len(self.t_pts))) # Evaluate the base_variable index-by-index idx = 0 for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[:, idx] = base_var_casadi(t, y, inputs).full()[:, 0] idx += 1 # Get node and edge values nodes = self.mesh.nodes edges = self.mesh.edges if entries.shape[0] == len(nodes): space = nodes elif entries.shape[0] == len(edges): space = edges # add points outside domain for extrapolation to boundaries extrap_space_left = np.array([2 * space[0] - space[1]]) extrap_space_right = np.array([2 * space[-1] - space[-2]]) space = np.concatenate([extrap_space_left, space, extrap_space_right]) extrap_entries_left = 2 * entries[0] - entries[1] extrap_entries_right = 2 * entries[-1] - entries[-2] entries_for_interp = np.vstack( [extrap_entries_left, entries, extrap_entries_right] ) # assign attributes for reference (either x_sol or r_sol) self.entries = entries self.dimensions = 1 self.spatial_variable_names = { k: self._process_spatial_variable_names(v) for k, v in self.spatial_variables.items() } self.first_dimension = self.spatial_variable_names["primary"] # assign attributes for reference pts_for_interp = space self.internal_boundaries = self.mesh.internal_boundaries # Set first_dim_pts to edges for nicer plotting self.first_dim_pts = edges # save attributes for interpolation self.entries_for_interp = entries_for_interp self.coords_for_interp = {self.first_dimension: pts_for_interp, "t": self.t_pts}
[docs] def initialise_2D(self): """ Initialise a 2D object that depends on x and r, x and z, x and R, or R and r. """ first_dim_nodes = self.mesh.nodes first_dim_edges = self.mesh.edges second_dim_nodes = self.base_variables[0].secondary_mesh.nodes second_dim_edges = self.base_variables[0].secondary_mesh.edges if self.base_eval_size // len(second_dim_nodes) == len(first_dim_nodes): first_dim_pts = first_dim_nodes elif self.base_eval_size // len(second_dim_nodes) == len(first_dim_edges): first_dim_pts = first_dim_edges second_dim_pts = second_dim_nodes first_dim_size = len(first_dim_pts) second_dim_size = len(second_dim_pts) entries = np.empty((first_dim_size, second_dim_size, len(self.t_pts))) # Evaluate the base_variable index-by-index idx = 0 for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[:, :, idx] = np.reshape( base_var_casadi(t, y, inputs).full(), [first_dim_size, second_dim_size], order="F", ) idx += 1 # add points outside first dimension domain for extrapolation to # boundaries extrap_space_first_dim_left = np.array( [2 * first_dim_pts[0] - first_dim_pts[1]] ) extrap_space_first_dim_right = np.array( [2 * first_dim_pts[-1] - first_dim_pts[-2]] ) first_dim_pts = np.concatenate( [extrap_space_first_dim_left, first_dim_pts, extrap_space_first_dim_right] ) extrap_entries_left = np.expand_dims(2 * entries[0] - entries[1], axis=0) extrap_entries_right = np.expand_dims(2 * entries[-1] - entries[-2], axis=0) entries_for_interp = np.concatenate( [extrap_entries_left, entries, extrap_entries_right], axis=0 ) # add points outside second dimension domain for extrapolation to # boundaries extrap_space_second_dim_left = np.array( [2 * second_dim_pts[0] - second_dim_pts[1]] ) extrap_space_second_dim_right = np.array( [2 * second_dim_pts[-1] - second_dim_pts[-2]] ) second_dim_pts = np.concatenate( [ extrap_space_second_dim_left, second_dim_pts, extrap_space_second_dim_right, ] ) extrap_entries_second_dim_left = np.expand_dims( 2 * entries_for_interp[:, 0, :] - entries_for_interp[:, 1, :], axis=1 ) extrap_entries_second_dim_right = np.expand_dims( 2 * entries_for_interp[:, -1, :] - entries_for_interp[:, -2, :], axis=1 ) entries_for_interp = np.concatenate( [ extrap_entries_second_dim_left, entries_for_interp, extrap_entries_second_dim_right, ], axis=1, ) self.spatial_variable_names = { k: self._process_spatial_variable_names(v) for k, v in self.spatial_variables.items() } self.first_dimension = self.spatial_variable_names["primary"] self.second_dimension = self.spatial_variable_names["secondary"] # assign attributes for reference self.entries = entries self.dimensions = 2 first_dim_pts_for_interp = first_dim_pts second_dim_pts_for_interp = second_dim_pts # Set pts to edges for nicer plotting self.first_dim_pts = first_dim_edges self.second_dim_pts = second_dim_edges # save attributes for interpolation self.entries_for_interp = entries_for_interp self.coords_for_interp = { self.first_dimension: first_dim_pts_for_interp, self.second_dimension: second_dim_pts_for_interp, "t": self.t_pts, }
def initialise_2D_scikit_fem(self): y_sol = self.mesh.edges["y"] len_y = len(y_sol) z_sol = self.mesh.edges["z"] len_z = len(z_sol) entries = np.empty((len_y, len_z, len(self.t_pts))) # Evaluate the base_variable index-by-index idx = 0 for ts, ys, inputs, base_var_casadi in zip( self.all_ts, self.all_ys, self.all_inputs_casadi, self.base_variables_casadi ): for inner_idx, t in enumerate(ts): t = ts[inner_idx] y = ys[:, inner_idx] entries[:, :, idx] = np.reshape( base_var_casadi(t, y, inputs).full(), [len_y, len_z], order="C", ) idx += 1 # assign attributes for reference self.entries = entries self.dimensions = 2 self.y_sol = y_sol self.z_sol = z_sol self.first_dimension = "y" self.second_dimension = "z" self.first_dim_pts = y_sol self.second_dim_pts = z_sol # save attributes for interpolation self.entries_for_interp = entries self.coords_for_interp = {"y": y_sol, "z": z_sol, "t": self.t_pts} def _process_spatial_variable_names(self, spatial_variable): if len(spatial_variable) == 0: return None # Extract names raw_names = [] for var in spatial_variable: # Ignore tabs in domain names if var == "tabs": continue if isinstance(var, str): raw_names.append(var) else: raw_names.append(var.name) # Rename battery variables to match PyBaMM convention if all([var.startswith("r") for var in raw_names]): return "r" elif all([var.startswith("x") for var in raw_names]): return "x" elif all([var.startswith("R") for var in raw_names]): return "R" elif len(raw_names) == 1: return raw_names[0] else: raise NotImplementedError( f"Spatial variable name not recognized for {spatial_variable}" ) def _initialize_xr_data_array(self): """ Initialize the xarray DataArray for interpolation. We don't do this by default as it has some overhead (~75 us) and sometimes we only need the entries of the processed variable, not the xarray object for interpolation. """ entries = self.entries_for_interp coords = self.coords_for_interp self._xr_data_array = xr.DataArray(entries, coords=coords) def __call__(self, t=None, x=None, r=None, y=None, z=None, R=None, warn=True): """ Evaluate the variable at arbitrary *dimensional* t (and x, r, y, z and/or R), using interpolation """ if self._xr_data_array is None: self._initialize_xr_data_array() kwargs = {"t": t, "x": x, "r": r, "y": y, "z": z, "R": R} # Remove any None arguments kwargs = {key: value for key, value in kwargs.items() if value is not None} # Use xarray interpolation, return numpy array return self._xr_data_array.interp(**kwargs).values @property def data(self): """Same as entries, but different name""" return self.entries @property def sensitivities(self): """ Returns a dictionary of sensitivities for each input parameter. The keys are the input parameters, and the value is a matrix of size (n_x * n_t, n_p), where n_x is the number of states, n_t is the number of time points, and n_p is the size of the input parameter """ # No sensitivities if there are no inputs if len(self.all_inputs[0]) == 0: return {} # Otherwise initialise and return sensitivities if self._sensitivities is None: if self.solution_sensitivities != {}: self.initialise_sensitivity_explicit_forward() else: raise ValueError( "Cannot compute sensitivities. The 'calculate_sensitivities' " "argument of the solver.solve should be changed from 'None' to " "allow sensitivities calculations. Check solver documentation for " "details." ) return self._sensitivities
[docs] def initialise_sensitivity_explicit_forward(self): "Set up the sensitivity dictionary" inputs_stacked = self.all_inputs_casadi[0] # Set up symbolic variables t_casadi = casadi.MX.sym("t") y_casadi = casadi.MX.sym("y", self.all_ys[0].shape[0]) p_casadi = { name: casadi.MX.sym(name, value.shape[0]) for name, value in self.all_inputs[0].items() } p_casadi_stacked = casadi.vertcat(*[p for p in p_casadi.values()]) # Convert variable to casadi format for differentiating var_casadi = self.base_variables[0].to_casadi( t_casadi, y_casadi, inputs=p_casadi ) dvar_dy = casadi.jacobian(var_casadi, y_casadi) dvar_dp = casadi.jacobian(var_casadi, p_casadi_stacked) # Convert to functions and evaluate index-by-index dvar_dy_func = casadi.Function( "dvar_dy", [t_casadi, y_casadi, p_casadi_stacked], [dvar_dy] ) dvar_dp_func = casadi.Function( "dvar_dp", [t_casadi, y_casadi, p_casadi_stacked], [dvar_dp] ) for index, (ts, ys) in enumerate(zip(self.all_ts, self.all_ys)): for idx, t in enumerate(ts): u = ys[:, idx] next_dvar_dy_eval = dvar_dy_func(t, u, inputs_stacked) next_dvar_dp_eval = dvar_dp_func(t, u, inputs_stacked) if index == 0 and idx == 0: dvar_dy_eval = next_dvar_dy_eval dvar_dp_eval = next_dvar_dp_eval else: dvar_dy_eval = casadi.diagcat(dvar_dy_eval, next_dvar_dy_eval) dvar_dp_eval = casadi.vertcat(dvar_dp_eval, next_dvar_dp_eval) # Compute sensitivity dy_dp = self.solution_sensitivities["all"] S_var = dvar_dy_eval @ dy_dp + dvar_dp_eval sensitivities = {"all": S_var} # Add the individual sensitivity start = 0 for name, inp in self.all_inputs[0].items(): end = start + inp.shape[0] sensitivities[name] = S_var[:, start:end] start = end # Save attribute self._sensitivities = sensitivities