Source code for cideMOD.simulation_interface.battery_system

#
# Copyright (c) 2023 CIDETEC Energy Storage.
#
# This file is part of cideMOD.
#
# cideMOD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import json
import os
import numpy as np
from pathlib import Path
from typing import Union

from cideMOD.helpers.logging import VerbosityLevel, _print
from cideMOD.helpers.plotview import PlotView
from cideMOD.helpers.miscellaneous import add_to_results_folder
from cideMOD.numerics.triggers import Trigger
from cideMOD.cell.parser import CellParser
from cideMOD.models import get_model_options, BaseModelOptions
from cideMOD.main import Problem
from cideMOD.simulation_interface.inputs import (CurrentInput, Cycle, Rest,
                                                 VoltageInput, execute_step)

current_dict = {'A': 1, 'mA': 1e-3}
voltage_dict = {'V': 1, 'mV': 1e-3}
time_dict = {'s': 1, 'min': 60, 'h': 3600, 'day': 24 * 3600}

DEFAULT_SIMULATION_OPTIONS = get_model_options()

DEFAULT_EVENT = {
    'type': 'Voltage',  # Voltage, Current, Ah, Wh
    'value': 2.8,  # Number
    'unit': 'V',
    'atol': 1e-3,  # Absolute tolerance
    'rtol': 1e-3,  # Relative tolerance
    'goto': 'End'  # Next or End or End Cycle or CV
}

DEFAULT_INPUT = {
    'name': 'Discharge',
    'type': 'Current',  # 'Current' or 'Voltage' or 'CC' or 'CV' or Rest
    'value': '-1',  # Must be float, int or string
    'unit': 'C',  # One of 'A', 'V', 'mA', 'mV', C
    't_max': {'value': 60, 'unit': 'min'},
    'store_delay': 2,
    'events': [DEFAULT_EVENT]
}

DEFAULT_PROFILE = {
    'type': 'CurrentProfile',
    'source': 'examples/notebooks/UDDS.txt',
    'unit': 'A',
    'delimiter': ';',
    'var_column': 3,
    'events': [DEFAULT_EVENT]
}

DEFAULT_TEST_PLAN = {
    'initial_state': {
        'SOC': 1,
        'exterior_temperature': 298.15
    },
    'steps': [DEFAULT_INPUT]
}

# TODO: Adapt CSI to any root model, not only the electrochemical.


[docs] class CSI: """ Interface class to deal with the Problem class at high level. Parameters ---------- cell_data : Union[dict,str] dictionary or path to the json file with the cell parameters simulation_options : Union[dict, BaseModelOptions], optional dicionary with simulation options, normally provided from the get_model_options method. test_plan : Union[dict,str], optional The dictionary with the test plan or a path to a JSON file with the test plan. Defaults to None. data_path : str, optional path to the folder with the additional data specified in cell_data. Defaults to None. """ def __init__(self, cell_data: Union[CellParser, dict, str], simulation_options: Union[dict, BaseModelOptions] = None, test_plan: Union[str, dict] = None, data_path: str = None): # Load simulation options if simulation_options is None: self.simulation_options = DEFAULT_SIMULATION_OPTIONS if isinstance(simulation_options, BaseModelOptions): self.simulation_options = simulation_options elif isinstance(simulation_options, dict): self.simulation_options = get_model_options(**simulation_options) else: raise TypeError("Argument simulation_options must be of type" + f"Union[dict, BaseModelOptions], is {type(simulation_options)}") self._comm = simulation_options.comm # Load cell data self.cell = self._load_cell_data(cell_data, data_path) self.ref_capacity = self.cell.ref_capacity # Create the problem self._create_problem() # Init test plan self._read_test_plan(test_plan) def _create_problem(self): # Create the problem self.problem = Problem(self.cell, self.simulation_options) def _load_cell_data(self, cell_data: Union[CellParser, dict, str], data_path=None): if isinstance(cell_data, CellParser): return cell_data elif isinstance(cell_data, str): if isinstance(data_path, str) and not cell_data.startswith(data_path): cell_data = os.path.join(data_path, cell_data) if not os.path.exists(cell_data): raise FileNotFoundError(f"Path to cell data '{cell_data}' does not exists") if data_path is None: data_path = Path(cell_data).parent cell_data = Path(cell_data).name else: assert data_path is not None, 'data_path must be provided' return CellParser(cell_data, data_path, self.simulation_options) def _read_test_plan(self, test_plan: Union[dict, str] = None): """ Read the specified test_plan. For examples about how to build a test_plan, see the DEFAULT_TEST_PLAN variable Parameters ---------- test_plan : Union[dict,str], optional The dictionary with the test plan or a path to a JSON file with the test plan. Defaults to None. """ if test_plan is None: plan = DEFAULT_TEST_PLAN elif isinstance(test_plan, str): assert os.path.exists(test_plan), "Path to test plan doesn't exists" with open(test_plan, 'r') as f: plan = json.load(f) elif isinstance(test_plan, dict): plan = test_plan self.test_plan = plan # Process Test Plan self.initial_state = plan['initial_state'] self.steps = [] self._C_rate_steps_ = [] self._C_rate_triggers_ = [] for step in plan['steps']: step = self._parse_step(step) if isinstance(step, list): self.steps.extend(step) else: self.steps.append(step) if len(self.steps) <= 0: raise RuntimeError("The test plan is empty") if self.simulation_options.verbose >= VerbosityLevel.BASIC_PROBLEM_INFO: self.print_test_plan() if self.simulation_options.save_path is not None: add_to_results_folder( self.simulation_options.save_path, files=[self.test_plan], filenames=['test_plan'])
[docs] def print_test_plan(self): # TODO: print current in terms of C_rate if given _print('Initial state:', comm=self._comm) _print(self.initial_state, comm=self._comm, print_dic_kwargs={'tab_ini': '\t'}) _print('Steps:', comm=self._comm) for i, step in enumerate(self.steps): _print(f"{i} - ", str(step), comm=self._comm)
def _parse_step(self, step): # Parse Input wrong_format_message = ("Wrong format in '{key}'. Here you have an example: \n" + str(DEFAULT_INPUT)) step_types = ['Voltage', 'Current', 'CC', 'CV', 'Rest', 'Profile', 'VoltageProfile', 'CurrentProfile', 'Cycle'] if step['type'] not in step_types: raise ValueError(wrong_format_message.format(key='type')) if step['type'] == 'Rest': t_max = self._unit_conversor( step['t_max']['value'], step['t_max'].get('unit', 's'), time_dict) input_event = Rest( step['name'], t_max, step.get('store_delay', 10), step.get('initial_step', None), step.get('max_step', 100), step.get('min_step', 5), step.get('adaptive', True), step.get('time_adaptive_tol', 1e-2)) elif step['type'] in ('Current', 'CC'): value = self._unit_conversor(step['value'], step.get('unit', 'A'), current_dict) t_max = self._unit_conversor( step['t_max']['value'], step['t_max'].get('unit', 's'), time_dict) input_event = CurrentInput( step['name'], value, t_max, step.get('store_delay', 10), step.get('initial_step', None), step.get('max_step', 100), step.get('min_step', 5), step.get('adaptive', True), step.get('time_adaptive_tol', 1e-2)) if step.get('unit', 'A') == 'C': self._C_rate_steps_.append((input_event, step['value'])) # Save (step, C-rate) elif step['type'] in ('Voltage', 'CV'): value = self._unit_conversor(step['value'], step.get('unit', 'V'), voltage_dict) t_max = self._unit_conversor( step['t_max']['value'], step['t_max'].get('unit', 's'), time_dict) input_event = VoltageInput( step['name'], value, t_max, step.get('store_delay', 10), step.get('initial_step', None), step.get('max_step', 100), step.get('min_step', 5), step.get('adaptive', True), step.get('time_adaptive_tol', 1e-2)) elif step['type'] in ('Profile', 'CurrentProfile', 'VoltageProfile'): fin = step['source'] delimiter = step.get('delimiter', ';') optimize = step.get('optimize', True) time_column = step.get('time_column', 0) var_column = step.get('var_column', 1) unit = step.get('unit', 'A') delay = step.get('store_delay', 10) skip_header = step.get('skip_header', True) if step['type'] == 'CurrentProfile': profile_type = 'current' unit = step.get('unit', 'A') elif step['type'] == 'VoltageProfile': profile_type = 'voltage' unit = step.get('unit', 'V') else: profile_type = step.get('profile_type', 'current') unit = step.get('unit', 'A') if profile_type not in ('voltage', 'current'): raise ValueError("Invalid profile type") time, var = self._read_profile(fin, profile_type=profile_type, time_index=time_column, var_index=var_column, optimize=optimize, delimiter=delimiter, skip_header=skip_header) if time[0] != 0: time = time - time[0] ProfileInput = CurrentInput if profile_type == 'current' else VoltageInput profile_dict = current_dict if profile_type == 'current' else voltage_dict input_event = [] for i, v in enumerate(var[:-1]): profile_step = ProfileInput( f"{step['name']}_{i}", self._unit_conversor(v, unit, profile_dict), time[i + 1] - time[i], store_delay=delay, max_step=step.get('max_step', 100), min_step=step.get('min_step', 5), adaptive=step.get('adaptive', True), time_adaptive_tol=step.get('time_adaptive_tol', 1e-2), ) if profile_type == 'current' and unit == 'C': self._C_rate_steps_.append((profile_step, step['value'])) input_event.append(profile_step) elif step['type'] in ('Cycle', ): assert all([key in step for key in ('count', 'steps')]) input_event = Cycle(step.get('name'), int(step['count'])) for substep in step.get('steps', []): input_event.add_step(self._parse_step(substep)) # Parse Triggers/Events available_trigger_actions = ['NEXT', 'CV', 'END', 'END CYCLE'] for event in step.get('events', []): for key in ['type']: if key not in event: raise ValueError( f"Wrong format in event. Here you have an example: \n {DEFAULT_EVENT}") if event['type'] == 'Voltage': assert all([key in event for key in ['value', 'unit']]) variable = 'v' value = self._unit_conversor(event['value'], event['unit'], voltage_dict) elif event['type'] == 'Current': variable = 'i' value = self._unit_conversor(event['value'], event['unit'], current_dict) action = event.get('goto', 'Next') if action.upper() not in available_trigger_actions: raise ValueError(f"Unrecognized trigger action '{action}'. Available options: '" + "' '".join(available_trigger_actions) + "'") trigger = Trigger(value, variable, event['atol'], event['rtol'], action=action) if isinstance(input_event, list): for profile_event in input_event: profile_event.add_trigger(trigger) else: input_event.add_trigger(trigger) if event['type'] == 'Current' and event['unit'] == 'C': self._C_rate_triggers_.append((trigger, event['value'])) return input_event def _unit_conversor(self, value, unit, units_dict={}): if not units_dict: units_dict = {**current_dict, **voltage_dict, **time_dict} if unit == 'C': units_dict['C'] = self.ref_capacity assert unit in units_dict, 'Unit not found' if isinstance(value, str): return '(' + value + ')*' + str(units_dict[unit]) elif value is None: return value else: return float(value) * units_dict[unit]
[docs] def setup(self): """ Initialize the initial state and set up the `cideMOD.Problem` object. For further information type `help(Problem.setup)` """ self._set_initial_state() self.problem.setup()
[docs] def run_test_plan(self): """ Do the simulation specified in the test plan with the loaded cell Returns ------- int or Exception: Status of the simulation, 0 means no problem, otherwise an Exception with information on the issues is returned. """ if not self.problem._ready: self.setup() # Do necessary steps _save_on_exit = self.simulation_options.save_on_exit self.simulation_options.save_on_exit = False for step in self.steps: status = execute_step(step, self.problem) if status != 0: break if _save_on_exit: self.simulation_options.save_on_exit = True self.problem._WH.write_globals(self.simulation_options.clean_on_exit, individual_txt=self.simulation_options.globals_txts) return status
def _read_profile(self, profile: Union[np.ndarray, list, str], profile_type='current', time_0=0, time_index=0, var_index=1, optimize=True, skip_header=True, delimiter=';'): # Reads series of input voltage or current as a function of time. # TODO: check if the profile is valid # FIXME: it skips some of the last steps, probably related to clean profile # if profile_type not in ['current', 'voltage']: # raise ValueError("Unrecognized profile type, options are 'current' or 'voltage'") if isinstance(profile, str): if not os.path.exists(profile): raise FileNotFoundError(f"'{profile}' is not a valid path") data = np.genfromtxt(profile, delimiter=delimiter, skip_header=skip_header) if isinstance(profile, list): data = np.array(profile) time = data[:, time_index] var = data[:, var_index] # if time_0 == 0 and time[0] != 0: # time = np.insert(time, 0, 0) # elif time_0 != 0: # # TODO: Make it possible to start at a time > 0 # pass if not optimize: pass elif profile_type == 'current': capacity_threshold = self.ref_capacity / 100 max_usage = max(abs(var[1:] - var[:-1]).max(), abs(var).max()) threshold = max(capacity_threshold, max_usage / 1000) time, var = self._clean_profile(time, var, threshold) elif profile_type == 'voltage': profile_type # Group similar data to get lower count of timesteps capacity_threshold = self.ref_capacity / 100 max_usage = max(abs(var[1:] - var[:-1]).max(), abs(var).max()) threshold = max(capacity_threshold, max_usage / 1000) time, var = self._clean_profile(time, var, threshold) else: raise ValueError("Unrecognized profile type, options are 'current' or 'voltage'") # Convert to delta time # delta_time = np.diff(time) # var = var[-len(delta_time):] # clean_data = (delta_time, var) clean_data = (time, var) return clean_data def _clean_profile(self, time: np.ndarray, var: np.ndarray, threshold: float): """ Clean the time series grouping similar values together up to a threshold in order to lower the count of timesteps Parameters ---------- time : np.ndarray Time measurements array var : np.ndarray Variable measured at a time (current or voltage) threshold : float Threshold for grouping values """ if time.shape != var.shape or time.size == 1: raise RuntimeError("Invalid profile input.") precision = int(-np.floor(np.log10(threshold))) clean_var = [] clean_time = [] temp_index = 0 for index, v in enumerate(var[1:]): temp_v = var[temp_index:index + 1].mean() if abs(temp_v - v) > threshold or index == len(var[1:]) - 1: # Check if there has been a jump if index - temp_index > 1 and abs(var[index - 1] - v) > threshold: # The jump will apply in the next step clean_var.append(round(var[temp_index:index - 1].mean(), precision)) else: clean_var.append(round(temp_v, precision)) clean_time.append(time[temp_index]) temp_index = index + 1 clean_time.append(time[-1]) return clean_time, clean_var def _set_initial_state(self): SoC = self.initial_state.get('SOC') T = self.initial_state.get('exterior_temperature') self.problem.set_cell_state(SoC=SoC, T_ini=T, T_ext=T)
[docs] def plot_global_results(self, results_path=None): """ Generates an interactive plot with all the global variables. Parameters ---------- results_path : StrPath, optional Path to the results folder, if None, the save_path of the problem object will be used. Defaults to None. """ PlotView(self.problem, results_path)
[docs] def reset(self, new_parameters: dict = None, save_path: str = None, save_config=True): """ This method resets CSI in order to be ready for running another simulation with the same initial conditions, and maybe using different parameters. Parameters ---------- new_parameters: Dict[str, float], optional Dictionary containing the cell parameters to be updated. save_path: str, optional Path to the new results folder. save_config Whether to save the parameter and simulation options to the results folder. Defaults to True. Notes ----- This method is used to avoid the generation of multiple Problem objects when running multiple simulations, for example when performing optimizations. """ self.problem.reset( new_parameters=new_parameters, save_path=save_path, save_config=save_config) # Update test plan if needed self.update_test_plan()
[docs] def update_test_plan(self, new_test_plan=None): """ This method updates the test plan. Parameters ---------- new_test_plan: dict, optional New test plan to be simulated. Default to None. Notes ----- If no new test plan is provided, this method checks if the reference capacity has changed and if so, updates the current steps and triggers based on C-rate input. """ if self.simulation_options.verbose >= VerbosityLevel.BASIC_PROBLEM_INFO: _print("Updating test plan - ...", end='\r', comm=self._comm) has_C_rate_changed = not np.isclose(self.ref_capacity, self.cell.ref_capacity) if has_C_rate_changed: # Update the reference capacity self.ref_capacity = self.cell.ref_capacity if new_test_plan is not None: # Set the new test plan self._read_test_plan(test_plan=new_test_plan) else: # Reset triggers for step in self.steps: for trigger in step.triggers: trigger.reset() if has_C_rate_changed: # Update the current steps that depends on the C-rate if self._C_rate_steps_: for step, C_rate_value in self._C_rate_steps_: step.i_app = C_rate_value * self.ref_capacity # Update the current triggers that depends on the C-rate if self._C_rate_triggers_: for trigger, C_rate_value in self._C_rate_triggers_: trigger.trigger_value = C_rate_value * self.ref_capacity if self.simulation_options.verbose >= VerbosityLevel.BASIC_PROBLEM_INFO: _print("Updating test plan - Done", comm=self._comm)