Source code for cideMOD.numerics.triggers

#
# Copyright (c) 2023 CIDETEC Energy Storage.
#
# This file is part of cideMOD.
#
# cideMOD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import functools


[docs] class Trigger: """ An event that fires a change in the cell input. It is evaluated during cell cycling at each timestep. Parameters ---------- value: Union[float, int] Value that fires the trigger. variable: str Variable of the state of the cell to compare. To know the available ones type `Trigger.print_available_variables`. atol: float, optional Absolute tolerance for the trigger to fire. Default to 1e-3. rtol: float, optional Relative tolerance for the trigger to fire. Default to 1e-3. action: str, optional Event to trigger. Default to 'Next'. mode: str, optional Can be 'min', 'max' or None to specify total triggers or relative to current state. Default to None. """ variables: dict = {'time': {'label': 't', 'units': 's', 'abs': False, 'atol': 1e-6}} available_variables: list = ['time', 't'] def __init__(self, value, variable: str, atol=None, rtol=None, action="Next", mode=None): self.trigger_value = value if variable not in self.available_variables: grouped_vars = zip(*[self.available_variables[i::2] for i in range(2)]) raise ValueError( f"Unrecognized trigger variable '{variable}'. Available options: " + ", ".join([f"'{var}'('{alias}')" for var, alias in grouped_vars])) self.variable = self._get_variable(variable) self._var_info = self.variables[self.variable] self.atol = abs(atol) if atol else self._var_info.get('atol', 1e-6) self.rtol = abs(rtol) if rtol else self._var_info.get('rtol', 1e-3) self.old_value = None self.action = action modes = [None, 'min', 'max'] if mode not in modes: raise ValueError(f"Unrecognized trigger mode '{mode}'. Available options: '" + "' '".join(modes) + "'") self.mode = mode def __repr__(self): return (f"{self.variable.capitalize()} Trigger at {self.trigger_value:.2g} " + self._var_info['units']) def __str__(self): symbol_mod = [">", "<", ""][["max", "min", None].index(self.mode)] return (f"{self.variable.capitalize()} is {symbol_mod}{self.trigger_value:.2g} " + self._var_info['units'])
[docs] @classmethod def register_variable(cls, name, label, units, need_abs=False, atol=1e-6): """ Register a new trigger variable. Parameters ---------- name: str Name of the variable label: str Label of the variable. It is used as an alias. units: str Units of the trigger variable. need_abs: bool Whether or not to check the absolute value of the trigger variable. Default to False. """ if name in cls.available_variables: raise ValueError(f"Trigger name '{name}' already registered!") elif label in cls.available_variables: raise ValueError(f"Trigger label '{label}' already registered!") cls.variables[name] = {'label': label, 'units': units, 'abs': need_abs, 'atol': atol} cls.available_variables.extend([name, label])
[docs] @classmethod def print_available_variables(cls): # TODO: Consider including model_options as input to print only the available variables of # the active models. grouped_vars = zip(*[cls.available_variables[i::2] for i in range(2)]) print(f"Available trigger variables: " + ", ".join([f"'{var}'('{alias}')" for var, alias in grouped_vars]))
@classmethod def _get_variable(cls, variable): if variable in cls.variables.keys(): return variable else: return cls.available_variables[cls.available_variables.index(variable) - 1] def _measure(self, state_dict): value = state_dict[self.variable] return abs(value) if self._var_info['abs'] else value def _detect(self, value): comp = (self.trigger_value - value) / max(self.atol, self.rtol * abs(self.trigger_value)) if self.mode == 'max': return comp < 0 and comp > -1 elif self.mode == 'min': return comp > 0 and comp < 1 else: return abs(comp) < 1 def _check_surpass(self, value): comp = (self.trigger_value - value) / max(self.atol, self.rtol * abs(self.trigger_value)) change = (value - self.trigger_value) * (self.old_value - self.trigger_value) < 0 if self.mode == 'max': return comp < -1 or change elif self.mode == 'min': return comp > 1 or change else: return change
[docs] def start_record(self, state): self.old_value = self._measure(state)
[docs] def check(self, state): if self.old_value is None: self.start_record(state) if self._check_surpass(self.old_value): raise TriggerDetected(self) else: new_value = self._measure(state) if self._detect(new_value): # Trigger detected raise TriggerDetected(self) elif self._check_surpass(new_value): # Trigger surpassed raise TriggerSurpassed( (self.trigger_value - self.old_value) / (new_value - self.old_value), self) else: # Trigger not reached self.old_value = new_value
[docs] def reset(self): self.old_value = None
[docs] class TriggerDetected(Exception): def __init__(self, trigger): self.trigger = trigger
[docs] def action(self): return self.trigger.action
def __repr__(self) -> str: return self.trigger.__repr__().replace('Trigger', 'TriggerDetected') def __str__(self) -> str: return self.trigger.__str__() # .replace('<', '').replace('>', '')
[docs] class TriggerSurpassed(Exception): def __init__(self, time_slope, trigger): self.time_slope = time_slope self.trigger = trigger
[docs] def new_tstep(self, old_tstep): return self.time_slope * old_tstep
def __repr__(self) -> str: return self.trigger.__repr__().replace('Trigger', 'TriggerSurpassed') def __str__(self) -> str: return functools.reduce( lambda string, old_new: string.replace(*old_new), [(" is ", " surpasses "), ('<', ''), ('>', '')], self.trigger.__str__())
[docs] class SolverCrashed(Exception): """ Exception indicating that the solver has crashed due to non-convergence or NaN values """ pass