#
# Copyright (c) 2023 CIDETEC Energy Storage.
#
# This file is part of cideMOD.
#
# cideMOD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
This module is the core of cideMOD. Contain the class for handling
battery cell simulations
"""
import os
import ufl
import dolfinx as dfx
import warnings
from dolfinx.common import Timer, timed
from petsc4py.PETSc import ScalarType
from typing import Union, List, overload
from cideMOD.helpers.logging import VerbosityLevel, _print
from cideMOD.helpers.miscellaneous import add_to_results_folder, format_time
from cideMOD.numerics.fem_handler import (BlockFunctionSpace, assign, isinstance_dolfinx,
assemble_scalar as assemble, block_derivative)
from cideMOD.numerics.time_scheme import TimeScheme
from cideMOD.numerics.time_stepper import ConstantTimeStepper, AdaptiveTimeStepper
from cideMOD.numerics.solver import NonlinearBlockProblem, NewtonBlockSolver
from cideMOD.numerics.triggers import Trigger, TriggerDetected, TriggerSurpassed, SolverCrashed
from cideMOD.cell.components import BatteryCell
from cideMOD.cell.parser import CellParser
from cideMOD.cell.dimensional_analysis import DimensionalAnalysis
from cideMOD.cell.warehouse import Warehouse
from cideMOD.cell.variables import ProblemVariables
from cideMOD.mesh.base_mesher import DolfinMesher, SubdomainMapper
from cideMOD.mesh.gmsh_adapter import GmshMesher
from cideMOD.models import BaseModelOptions
[docs]
class Problem:
"""
Class for handling battery cell simulations.
Parameters
----------
cell : CellParser
Parser of the cell dictionary.
model_options : ModelOptions
Model options already configured by the user.
"""
def __init__(self, cell: CellParser, model_options: BaseModelOptions):
self.model_options = model_options
self.cell_parser = cell
self.verbose = model_options.verbose
self.save_path = model_options.save_path
self._comm = model_options.comm
self._models = model_options._get_model_handler()
self.user_bcs = {}
self._ready = False
# Build the mesh
self._build_mesh()
# Initialize the cell state
self.set_cell_state()
# Initialize the warehouse
self._WH = Warehouse(self.save_path, self)
# Setup the CellParser object
self.cell_parser._set_problem(self)
# Add files to the results folder
self.save_path = self.model_options.save_path
if self.save_path is not None:
add_to_results_folder(
self.save_path, files=[cell.get_dict(), self.model_options.dict(exclude={'comm'})],
filenames=['params', 'simulation_options'])
[docs]
def set_cell_state(self, **kwargs):
"""
This method set the current state of the cell.
Parameters
----------
kwargs: dict
Dictionary containing the parameters that describe the cell
state. To know more type :meth:`cideMOD.info(
'set_cell_state', model_options=model_options)`
"""
self._models.set_cell_state(self, **kwargs)
[docs]
def set_boundary_condition(self, bc):
"""
This method set the given boundary condition.
Parameters
----------
bc: BaseBoundaryCondition
Object that contain the required information to set the new
boundary condition.
"""
# TODO: Implement BaseBoundaryCondition and Heater. Think about including the default
# boundary conditions to a new dictionary self._bcs.
raise NotImplementedError
self._models.parse_boundary_condition(bc)
self.user_bcs[bc._tag_] = bc
[docs]
def add_global_variables(self, name: Union[str, List[str]]):
"""
This method add the requested variable to the list of requested
global variables to be compute throughout the simulation.
name: Union[str, List[str]]
Name/s of the requested global variable/s. It should be
provided by the active models.
"""
# TODO: Provide the user a method like print_available_global_variables
if not isinstance(name, list):
name = [name]
for var in name:
self._WH.add_global_variable(var)
[docs]
def add_internal_variables(self, name: Union[str, List[str]]):
"""
This method add the requested variable to the list of requested
internal variables to be compute throughout the simulation.
name: Union[str, List[str]]
Name/s of the requested global variable/s. It should be
provided by the active models.
"""
# TODO: Provide the user a method like print_available_internal_variables
if not isinstance(name, list):
name = [name]
for var in name:
self._WH.add_internal_variable(var)
def _build_mesh(self):
"""Builds the mesh of the cell"""
# TODO: Mesh engine should be given by the model type, add it as a private attribute to
# ModelHandler and ask the models for the right mesher class. Ideally, there should
# be only one mesher class.
mesh_engine = DolfinMesher if self.model_options.model == "P2D" else GmshMesher
self.mesher = mesh_engine(options=self.model_options, cell=self.cell_parser)
self.mesher.build_mesh(dimless_model=self.model_options.dimensionless)
# ******************************************************************************************* #
# *** Setup Stage *** #
# ******************************************************************************************* #
[docs]
def setup(self):
"""
Set-up the Problem object:
- Build the FEM function spaces
- Build the cell properties
- Set-up internal state variables
- Set-up Warehouse object
- Build the variational formulation
"""
basic_info = VerbosityLevel.BASIC_PROBLEM_INFO
detailed_info = VerbosityLevel.DETAILED_PROGRESS_INFO
self._print('Building problem setup', verbosity=basic_info)
# Initialize the time scheme and the time
self.time = 0.
self._time = dfx.fem.Constant(self.mesher.mesh, ScalarType(0))
self._DT = TimeScheme(self.model_options.time_scheme, self.mesher.mesh)
# TODO: Initialize here also the time stepper if implemented
# Setup independent variables and control variables
self._vars = ProblemVariables(self)
# Build the dimensional analysis
self._print('- Building cell parameters ...', end='\r', verbosity=basic_info, only=True)
if self.model_options.dimensionless:
self._print(
'- Performing the dimensional analysis ...', end='\r', verbosity=detailed_info)
self._DA = DimensionalAnalysis(self.cell_parser, self.model_options) # , self._vars)
if self.model_options.dimensionless:
self._print('- Performing the dimensional analysis - Done', verbosity=detailed_info)
# Build FEM function spaces
self._print('- Building FEM function spaces ...', end='\r', verbosity=detailed_info)
self._build_function_spaces()
self._print('- Building FEM function spaces - Done', verbosity=detailed_info)
# Build control and state variables
self._vars.setup(self)
# Build cell parameters
if self.verbose >= detailed_info:
self._print('- Building cell parameters:', verbosity=basic_info)
else:
self._print('- Building cell parameters ...', end='\r', verbosity=basic_info)
self.cell = BatteryCell(self)
# Build dependent variables
self._models.set_dependent_variables(self._vars, self.cell, self._DT, self)
self._print('- Building cell parameters - Done', verbosity=basic_info)
# Initial guess
self._print('- Initializing state ... ', end='\r', verbosity=basic_info)
self._models.initial_guess(self._f_0, self._vars, self.cell, self)
assign(self.u_2, self.u_1)
assign(self.u_0, self.u_1)
self._print('- Initializing state - Done ', verbosity=basic_info)
# Build weak formulation
self._print('- Build variational formulation ... ', end='\r', verbosity=basic_info)
self._build_weak_formulation()
self._print('- Build variational formulation - Done ', verbosity=basic_info)
# Build solver
self._print('- Building solvers ... ', end='\r', verbosity=detailed_info)
self._build_solvers()
self._print('- Building solvers - Done', verbosity=detailed_info)
# Warehouse setup
self._WH.setup()
# Setup active models if required
self._models.setup(self)
self._ready = True
def _build_function_spaces(self):
"""Builds the function space"""
# Get model function spaces
P1 = dfx.fem.FunctionSpace(self.mesher.mesh, ('Lagrange', 1))
P1_vec = dfx.fem.VectorFunctionSpace(self.mesher.mesh, ('Lagrange', 1))
elements = self._models.set_state_variables(self.mesher, P1, P1_vec, self)
# Define mixed function space
self.W = BlockFunctionSpace(*zip(*elements))
self.du = self.W.create_trial_function()
self.u_2 = self.W.create_block_function()
self.u_1 = self.W.create_block_function(suffix='_prev')
self.u_0 = self.W.create_block_function(suffix='_2prev')
self.test = self.W.create_test_function()
self._f_1 = self.u_2
self._f_0 = self.u_1
# Additional fem functions
self.V = dfx.fem.FunctionSpace(self.mesher.mesh, ('Lagrange', 1))
self.V_vec = dfx.fem.VectorFunctionSpace(self.mesher.mesh, ('Lagrange', 1))
self.V_0 = dfx.fem.FunctionSpace(self.mesher.mesh, ('DG', 0))
self.V_0_vec = dfx.fem.VectorFunctionSpace(self.mesher.mesh, ('DG', 0))
self.P1_map = SubdomainMapper(self.mesher.field_restrictions, self.V)
self.P1_vec_map = SubdomainMapper(self.mesher.field_restrictions, self.V_vec)
self.P0_map = SubdomainMapper(self.mesher.field_restrictions, self.V_0)
self.P0_vec_map = SubdomainMapper(self.mesher.field_restrictions, self.V_0_vec)
def _build_weak_formulation(self):
self._solvers_info = self._models.get_solvers_info(self)
eq = self._models.build_weak_formulation(
self._solvers_info, self._vars, self.cell, self.mesher, self._DT, self.W, self)
eq_transitory = self._models.build_weak_formulation_transitory(
self._solvers_info, self._vars, self.cell, self.mesher, self.W, self)
# TODO: Implement the stationary solver
# eq_stationary = self._models.build_weak_formulation_stationary(
# self._solvers_info, self._vars, self.cell, self.mesher, self.W, self)
self._solvers_info['solver']['equations'] = eq
self._solvers_info['solver_transitory']['equations'] = eq_transitory
# self._solvers_info['solver_stationary']['equations'] = eq_stationary
def _build_solvers(self):
monitor = self.verbose >= VerbosityLevel.DETAILED_SOLVER_INFO
for solver_name, solver_info in self._solvers_info.items():
state_vars = solver_info['state_variables']
u = [self.u_2(state_var) for state_var in state_vars]
du = [self.du(state_var) for state_var in state_vars]
F_var = [solver_info['equations'][state_var] for state_var in state_vars]
J_var = block_derivative(F_var, u, du)
restrictions = [self.W.get_restriction(state_var) for state_var in state_vars]
bcs = solver_info['equations'].get_boundary_conditions()
if self.save_path is not None:
save_path = os.path.join(self.save_path, solver_name)
else:
save_path = None
block_problem = NonlinearBlockProblem(F_var, u, bcs, J_var, restrictions)
block_solver = NewtonBlockSolver(
self._comm, block_problem, monitor=monitor, save_path=save_path)
if solver_info['options']:
block_solver._set_options(solver_info['options'])
solver_info['block_problem'] = block_problem
solver_info['block_solver'] = block_solver
self._solver = self._solvers_info['solver']['block_solver']
self._solver_transitory = self._solvers_info['solver_transitory']['block_solver']
# self._solver_stationary = self._solvers_info['solver_stationary']['block_solver']
# ******************************************************************************************* #
# *** Simulation Stage *** #
# ******************************************************************************************* #
@overload
def solve(self, t_f=3600, store_delay=1, min_step=0.01, triggers: List[Trigger] = [],
adaptive: bool = False, **kwargs):
...
@overload
def solve(self, t_f=3600, store_delay=1, initial_step=None, max_step=3600, min_step=0.01,
triggers: List[Trigger] = [], adaptive: bool = True, time_adaptive_tol=1e-2,
**kwargs):
...
[docs]
def solve(self, t_f=3600, store_delay=1, initial_step=None, max_step=3600, min_step=0.01,
triggers: List[Trigger] = [], adaptive: bool = True, time_adaptive_tol=1e-2,
**kwargs):
"""
Perform a simulation step. For more complex inputs it is
recommended to use several calls to this method.
Parameters
----------
t_f : float, optional
The maximum duration of the simulation. Defaults to 3600.
store_delay : int, optional
The delay to apply between consecutive saves of the internal
variables, in number of timesteps. Defaults to 1.
initial_step : float, optional
Initial timestep length. If not given, the timestep chose is
the minimum. Default to None.
max_step : float, optional
Maximum timestep length for adaptive solver in seconds.
Default to 3600.
min_step : float, optional
Minimum timestep length for adaptive solver in seconds.
Default to 0.01.
triggers : list, optional
List of Triggers to check during runtime. Default to [].
adaptive : bool, optional
Whether to use adaptive timestepping or not. Default to
True.
time_adaptive_tol : Union[float,int]
Tolerance of the time-adaptive scheme. Defaults to 1e-2.
kwargs : dict
Control variables of the problem. Could be constant or a
time-dependent expression. To know the required control
variables type `problem.print_control_variables_info()`.
Returns
-------
Union[int, Exception]
The status of the simulation. If there is an error, the
Exception object is returned. Otherwise return 0.
"""
if not self._ready:
self.setup()
if initial_step is None:
initial_step = min_step
initial_step = max(initial_step, min_step)
self._WH.set_delay(store_delay)
if adaptive:
time_stepper = AdaptiveTimeStepper(self, dt=initial_step, min_step=min_step,
max_step=max_step, t_max=t_f, tol=time_adaptive_tol,
triggers=triggers, **kwargs)
else:
time_stepper = ConstantTimeStepper(self, dt=initial_step, triggers=triggers, **kwargs)
# Initialize the cell state attribute
self.state = {'time': self.time}
self._print('Solving ...', only=True)
timer = Timer('Simulation time')
timer.start()
self._solver._set_options([('snes_lag_jacobian', 1), ('snes_max_it', 50)])
_pad = 0
while self.time < t_f:
# if it==1:
# self._solver._set_options([('snes_lag_jacobian', 5), ('snes_max_it', 30)])
errorcode = time_stepper.timestep()
errorcode_ex = self._explicit_processing()
if self.verbose >= VerbosityLevel.BASIC_PROGRESS_INFO:
log_msg = f"Time: {format_time(self.state['time'])} "
if _pad < len(log_msg):
_pad = len(log_msg)
log_msg = (log_msg.ljust(_pad)
+ ' '.join([f"{k.capitalize()}: {v:.4g}"
for k, v in self.state.items() if k != 'time']))
_print(log_msg, end='\r', comm=self._comm)
if errorcode != 0 or errorcode_ex != 0:
if (errorcode_ex == 0
and isinstance(errorcode, (TriggerDetected, TriggerSurpassed))):
self._advance_problem()
break
else:
self._advance_problem()
if self.time >= t_f:
self._print(f"Reached max time {self.time:.2f}", end="\n\n")
timer.stop()
return self.exit(errorcode if errorcode != 0 else errorcode_ex)
@timed('Explicit Processing')
def _explicit_processing(self):
# TODO: Allow explicit models to return an status variable to stop the solve.
self._models.explicit_update(self)
return 0
def _advance_problem(self):
self.time += self.get_timestep()
self._time.value = self.time
self._WH.store(self.time)
assign(self.u_0, self.u_1)
assign(self.u_1, self.u_2)
[docs]
def set_timestep(self, timestep):
timestep_ = self._DA.scale_variable('time', timestep)
self._DT.set_timestep(timestep_)
[docs]
def get_timestep(self):
timestep_ = self._DT.get_timestep()
return self._DA.unscale_variable('time', timestep_)
[docs]
def exit(self, errorcode):
if self.model_options.save_on_exit:
self._WH.write_globals(self.model_options.clean_on_exit,
individual_txt=self.model_options.globals_txts)
if self.model_options.raise_errors_on_exit and isinstance(errorcode, SolverCrashed):
raise errorcode
return errorcode
# ******************************************************************************************* #
# *** Problem Utils *** #
# ******************************************************************************************* #
[docs]
def update_dynamic_parameters(self, dynamic_parameters):
"""
This method updates the values of the dynamic parameters of the
cell and the components it has.
Parameters
----------
dynamic_parameters: dict
Dictionary containing the dynamic parameter names and values
to be updated.
Notes
-----
To update a dynamic parameter of the cell:
>>> problem.update_dynamic_parameters({'area': 0.1})
or
>>> problem.update_dynamic_parameters({'cell.area': 0.1})
To update a dynamic parameter of a cell component:
>>> problem.update_dynamic_parameters({'anode.thickness': 1e-4})
"""
if not dynamic_parameters:
return
_dynamic_parameters = dict()
for name, value in dynamic_parameters.items():
parameter = self.cell_parser.get_parameter(name)
if self._ready and isinstance_dolfinx(value):
raise RuntimeError(parameter._get_error_msg(
reason=("Unable to update the dynamic parameter as a "
+ "new dolfinx object after setting up the problem"),
action='handling'
))
_dynamic_parameters[str(parameter)] = value
self.cell_parser.update_parameters(_dynamic_parameters)
[docs]
def reset(self, new_parameters: dict = None, triggers: List[Trigger] = None,
save_path: str = None, save_config=True, prefix='results_'):
"""
This method resets the problem in order to be ready for running
another simulation with the same initial conditions, and maybe
using different parameters.
Parameters
----------
new_parameters: Dict[str, float], optional
Dictionary containing the cell parameters to be updated.
triggers: List[Triggers]
List containing the triggers that should be reset.
save_path: str, optional
Path to the new results folder.
save_config
Whether to save the parameter and simulation options to the
results folder. Defaults to True.
Notes
-----
This method is used to avoid the generation of multiple Problem
objects when running multiple simulations, for example when
performing optimizations.
"""
timer = Timer('Problem Reset')
basic_info = VerbosityLevel.BASIC_PROBLEM_INFO
self._print('Reseting the problem ...', end='\r', verbosity=basic_info)
# Reset triggers
if triggers is not None:
for t in triggers:
t.reset()
# Update the dynamic parameters
last_dims = self.mesher.get_dims()
self.update_dynamic_parameters(new_parameters)
# Check if a deep reset is needed (repeat the setup stage)
deep_reset = False
if new_parameters and self._ready:
# TODO: Just check if the mesh is dimensional
if isinstance(self.mesher, GmshMesher) and self.model_options.dimensionless:
deep_reset = not all([old == new
for old, new in zip(last_dims, self.mesher.get_dims())])
else:
self.mesher.scale = self.mesher.get_dims()[0]
# New results folder
if save_path:
self.save_path = self.update_save_path(
save_path, save_config=save_config, prefix=prefix)
# Build the new mesh if required
# TODO: Check if the mesh is dimensional
if deep_reset and self.model_options.dimensionless:
# timer.stop()
self._build_mesh()
# timer.resume()
# Reset internal classes and attributes
if self._ready:
self._WH.reset(self.save_path, deep_reset=deep_reset)
self._models.reset(self, deep_reset=deep_reset)
# Reset internal classes and attributes
if not self._ready:
timer.stop()
elif deep_reset:
timer.stop()
self.setup()
else:
self.time = 0.
self._time.value = 0.
# Reset block functions
self.u_1.clear()
# Reset mesh dependent parameters
# TODO: It refers to parameters defined with dolfinx.fem.Function, see CellParameter.
# Initial state
self._models.initial_guess(self._f_0, self._vars, self.cell, self)
assign(self.u_2, self.u_1)
assign(self.u_0, self.u_1)
timer.stop()
# Reset solvers
for solver_info in self._solvers_info.values():
block_solver = solver_info['block_solver']
block_solver.reset()
self._print('Reseting the problem - Done', verbosity=basic_info)
[docs]
def update_save_path(self, save_path, save_config=True, files=[], filenames=[],
prefix='results_'):
save_path = self.model_options._update_save_path(save_path, prefix=prefix)
if save_path == self.save_path and not self.model_options.overwrite:
warnings.warn(
f"The given save path has already been set: '{save_path}'", RuntimeWarning)
return
else:
self.save_path = save_path
if save_config:
files.extend([self.cell_parser.get_dict(), self.model_options.dict(exclude={'comm'})])
filenames.extend(['params', 'simulation_options'])
if files:
add_to_results_folder(self.save_path, files, filenames, self._comm, overwrite=False)
self._WH._update_save_path(self.save_path)
if self._ready:
for solver_name, solver_info in self._solvers_info.items():
block_solver = solver_info['block_solver']
if self.save_path is not None:
solver_save_path = os.path.join(self.save_path, solver_name)
block_solver._set_save_path(solver_save_path)
if block_solver.monitor:
# TODO: Deactivate previous monitors
block_solver._set_monitor()
else:
# TODO: Deactivate previous monitors
block_solver.save_path = None
return self.save_path
[docs]
def get_global_variable(self, name: str):
"""
Get the values of a global variable over the timesteps
Parameters
----------
name : str
Name of the global variable
Returns
-------
list
List of values
"""
return self._WH.get_global_variable(name)
[docs]
def get_global_variable_fnc(self, name: str):
return self._WH.get_global_variable_fnc(name)
[docs]
def get_global_variable_value(self, name: str):
return self._WH.get_global_variable_value(name)
[docs]
def get_avg(self, variable, domain: Union[ufl.Measure, str], integral_type='x'):
"""
Get the average of the variable over the given subdomain or
surface
Parameters
----------
variable : Union[ufl.Integral, ufl.Operator, dolfinx.Function]
Expression of the variable to be averaged
domain : Union[Measure, str]
Measure or tag of the domain to integrate over
integral_type : str
Integral type. Only used if `domain` is a string.
Available options: 'x', 's' and 'S'
Returns
-------
float
Averaged variable over the given subdomain or surface
"""
if isinstance(variable, (float, int)):
return variable
elif isinstance(domain, ufl.Measure):
dx = domain
volume = self.mesher.volumes[self.mesher.get_measures().index(dx)]
else:
dx = self.mesher.get_measures()._asdict()[f'{integral_type}_{domain}']
volume = self.mesher.volumes._asdict()[f'{integral_type}_{domain}']
if isinstance(variable, (ufl.Form, dfx.fem.Form)):
return assemble(variable) / volume
else:
return assemble(variable * dx) / volume
[docs]
def print_control_variables_info(self):
# TODO: Implement this method, maybe using the models_info method
raise NotImplementedError("Not implemented yet.")
[docs]
def print_available_outputs(self):
# TODO: Use model_info for this method.
# TODO: Different colors if the are already selected.
info = ("Available global variables: '"
+ "' '".join(self._WH._outputs_info['globals'].keys()) + "'\n"
+ "Available internal variables: '"
+ "' '".join(self._WH._outputs_info['internals'].keys()) + "'")
self._print(info, verbosity=-1)
def _monitor_last_iteration(self, solvers='solver', set_monitor=True, plot=True, clean=True):
"""
This method monitors the last iteration of the specified solver.
Parameters
----------
solver: str
Name of the solver. Default to `solver`.
Notes
-----
Normally used to debug a iteration where the solver crashed.
"""
# Reset solution to the previous timestep
assign(self._f_1, self._f_0)
if isinstance(solvers, str):
solvers = [solvers]
for solver in solvers:
# Update solver configuration
block_problem = self._solvers_info[solver]['block_problem']
block_solver = self._solvers_info[solver]['block_solver']
block_problem._set_options(inspect_residuals=True, inspect_jacobian=True,
print_residuals=True, plot_jacobian=True)
if set_monitor and block_solver.save_path is not None and not block_solver.monitor:
block_solver.monitor = True
block_solver._set_save_path()
block_solver._set_monitor()
# Solve
try:
out = block_solver.solve(plot=plot, clean=clean)
except RuntimeError as e:
out = e
break
# Clean
if clean:
# TODO: Deactivate block_solver monitor
block_problem._set_options(inspect_residuals=False, inspect_jacobian=False,
print_residuals=False, plot_jacobian=False)
block_solver.monitor = False
return out
def _print(self, *args, verbosity: int = VerbosityLevel.BASIC_PROGRESS_INFO,
only=False, **kwargs):
if not only and self.verbose >= verbosity or self.verbose == verbosity:
return _print(*args, comm=self._comm, **kwargs)