Source code for opfgym.opf_env


from collections.abc import Callable
import copy
import logging
import inspect

import gymnasium as gym
import numpy as np
import pandapower as pp
import pandas as pd
import scipy
from scipy import stats
from typing import Tuple

import opfgym
import opfgym.util
import opfgym.objective
from opfgym.simbench.data_split import define_test_train_split
from opfgym.simbench.time_observation import get_simbench_time_observation


class PowerFlowNotAvailable(Exception):
    pass


class OpfEnv(gym.Env):
    def __init__(self,
                 net: pp.pandapowerNet,
                 action_keys: tuple[tuple[str, str, np.ndarray], ...],
                 observation_keys: tuple[tuple[str, str, np.ndarray], ...],
                 state_keys: tuple[tuple[str, str, np.ndarray], ...]=None,
                 profiles: dict[str, pd.DataFrame]=None,
                 evaluate_on: str='validation',
                 steps_per_episode: int=1,
                 bus_wise_obs: bool=False,
                 reward_function: str | opfgym.RewardFunction = 'summation',
                 reward_function_params: dict=None,
                 diff_objective: bool=False,
                 add_res_obs: bool=False,
                 add_time_obs: bool=False,
                 add_act_obs: bool=False,
                 add_mean_obs: bool=False,
                 train_data: str='simbench',
                 test_data: str='simbench',
                 sampling_params: dict=None,
                 constraint_params: dict={},
                 custom_constraints: list=None,
                 autoscale_actions: bool=True,
                 diff_action_step_size: float=None,
                 clipped_action_penalty: float=0.0,
                 initial_action: str='center',
                 objective_function: Callable[[pp.pandapowerNet], np.ndarray | float]=None,
                 power_flow_solver: Callable[[pp.pandapowerNet], None]=None,
                 optimal_power_flow_solver: Callable[[pp.pandapowerNet], None]=None,
                 seed: int=None,
                 **kwargs):

        self.net = net
        self.obs_keys = observation_keys
        self.state_keys = state_keys or copy.copy(observation_keys)
        self.act_keys = action_keys
        self.profiles = profiles

        if not profiles:
            assert 'simbench' not in test_data
            assert 'simbench' not in train_data
            assert not add_time_obs

        # Define the power flow and OPF solvers (default to pandapower)
        self._run_power_flow = power_flow_solver or self.default_power_flow
        if optimal_power_flow_solver is None:
            self._run_optimal_power_flow = self.default_optimal_power_flow
        elif optimal_power_flow_solver is False:
            # No optimal power flow solver available
            self._run_optimal_power_flow = raise_opf_not_converged
        else:
            self._run_optimal_power_flow = optimal_power_flow_solver

        # Define objective function
        if objective_function is None:
            self.objective_function = opfgym.objective.get_pandapower_costs
        else:
            assert_only_net_in_signature(objective_function)
            self.objective_function = objective_function

        self.evaluate_on = evaluate_on
        self.train_data = train_data
        self.test_data = test_data
        self.sampling_params = sampling_params or {}

        # Define the observation space
        self.add_act_obs = add_act_obs
        if add_act_obs:
            # The agent can observe its previous actions
            self.obs_keys.extend(self.act_keys)

        self.add_time_obs = add_time_obs
        # Add observations that require previous pf calculation
        if add_res_obs is True:
            # Default: Add all results that are usually available
            add_res_obs = ('voltage_magnitude', 'voltage_angle', 
                           'line_loading', 'trafo_loading', 'ext_grid_power')
        if add_res_obs:
            # Tricky: Only use buses with actual units connected. Otherwise, too many auxiliary buses are included.
            bus_idxs = set(self.net.load.bus) | set(self.net.sgen.bus) | set(self.net.gen.bus) | set(self.net.storage.bus)
            add_obs = []
            if 'voltage_magnitude' in add_res_obs:
                add_obs.append(('res_bus', 'vm_pu', np.sort(list(bus_idxs))))
            if 'voltage_angle' in add_res_obs:
                add_obs.append(('res_bus', 'va_degree', np.sort(list(bus_idxs))))
            if 'line_loading' in add_res_obs:
                add_obs.append(('res_line', 'loading_percent', self.net.line.index))
            if 'trafo_loading' in add_res_obs:
                add_obs.append(('res_trafo', 'loading_percent', self.net.trafo.index))
            if 'ext_grid_power' in add_res_obs:
                add_obs.append(('res_ext_grid', 'p_mw', self.net.ext_grid.index))
                add_obs.append(('res_ext_grid', 'q_mvar', self.net.ext_grid.index))
            self.obs_keys.extend(add_obs)

        self.add_mean_obs = add_mean_obs

        # Define observation, state, and action spaces
        self.bus_wise_obs = bus_wise_obs
        self.observation_space = get_obs_and_state_space(
            self.net, self.obs_keys, add_time_obs, add_mean_obs,
            seed=seed, bus_wise_obs=bus_wise_obs)
        self.state_space = get_obs_and_state_space(
            self.net, self.state_keys, seed=seed)
        n_actions = sum([len(idxs) for _, _, idxs in self.act_keys])
        self.action_space = gym.spaces.Box(0, 1, shape=(n_actions,), seed=seed)

        # Action space details
        self.autoscale_actions = autoscale_actions
        self.diff_action_step_size = diff_action_step_size
        self.clipped_action_penalty = clipped_action_penalty
        self.initial_action = initial_action

        self.steps_per_episode = steps_per_episode

        # Full state of the system (available in training, but not in testing)
        self.state = None  # TODO: Not implemented yet. Required only for partially observable envs

        # Is a powerflow calculation required to get new observations in reset?
        self.pf_for_obs = False
        for unit_type, _, _ in self.obs_keys:
            if 'res_' in unit_type:
                self.pf_for_obs = True
                break

        self.diff_objective = diff_objective
        if diff_objective:
            # An initial power flow is required to compute the initial objective
            self.pf_for_obs = True

        # Define data distribution for training and testing
        self.test_steps, self.validation_steps, self.train_steps = define_test_train_split(**kwargs)

        # Constraints
        if custom_constraints is None:
            self.constraints = opfgym.constraints.create_default_constraints(
                self.net, constraint_params)
        else:
            self.constraints = custom_constraints

        # Define reward function
        reward_function_params = reward_function_params or {}
        if isinstance(reward_function, str):
            # Load by string (e.g. 'Summation' or 'summation')
            reward_class = opfgym.util.load_class_from_module(
                reward_function, 'opfgym.reward')
            self.reward_function = reward_class(
                env=self, **reward_function_params)
        elif isinstance(reward_function, opfgym.RewardFunction):
            # User-defined reward function
            self.reward_function = reward_function

[docs] def reset(self, seed: int=None, options: dict=None) -> tuple[np.ndarray, dict]: """ gymnasium API. Reset the environment to a new state. Samples a random state from the given data distribution, applies an initial action, runs a power flow calculation (optional), and returns the initial observation. :param seed: Seed for the random number generator. :param options: Additional options for the reset method. Available options: 'step' (int) to control the data sampling, 'test' (bool) to sample from test data.""" super().reset(seed=seed) self.info = {} self.current_simbench_step = None self.step_in_episode = 0 if not options: options = {} self.test = options.get('test', False) step = options.get('step', None) self.apply_action = options.get('new_action', True) self._sampling(step, self.test, self.apply_action) if self.initial_action == 'random': # Use random actions as starting point so that agent learns to handle that act = self.action_space.sample() else: # Reset all actions to default values act = (self.action_space.low + self.action_space.high) / 2 self._apply_actions(act) if self.pf_for_obs is True: self.run_power_flow() if not self.power_flow_available: logging.warning( 'Failed powerflow calculcation in reset. Try again!') return self.reset() self.initial_obj = self.calculate_objective(diff_objective=False) obs = self._get_obs(self.obs_keys, self.add_time_obs, self.add_mean_obs) return obs, copy.deepcopy(self.info)
def _sampling(self, step=None, test=False, sample_new=True, *args, **kwargs) -> None: self.set_power_flow_unavailable() data_distr = self.test_data if test is True else self.train_data kwargs.update(self.sampling_params) # Maybe also allow different kinds of noise and similar! with `**sampling_params`? if data_distr == 'noisy_simbench' or 'noise_factor' in kwargs.keys(): if sample_new: self._set_simbench_state(step, test, *args, **kwargs) elif data_distr == 'simbench': if sample_new: self._set_simbench_state( step, test, noise_factor=0.0, *args, **kwargs) elif data_distr == 'full_uniform': self._sample_uniform(sample_new=sample_new) elif data_distr == 'normal_around_mean': self._sample_normal(sample_new=sample_new, **kwargs) elif data_distr == 'mixed': # Use different data sources with different probabilities r = self.np_random.random() data_probs = kwargs.get('data_probabilities', (0.5, 0.75, 1.0)) if r < data_probs[0]: self._set_simbench_state(step, test, *args, **kwargs) elif r < data_probs[1]: self._sample_uniform(sample_new=sample_new) else: self._sample_normal(sample_new=sample_new, **kwargs) def _sample_uniform(self, sample_keys=None, sample_new=True) -> None: """ Standard pre-implemented method to set power system to a new random state from uniform sampling. Uses the observation space as basis. Requirement: For every observations there must be "min_{obs}" and "max_{obs}" given as range to sample from. """ assert sample_new, 'Currently only implemented for sample_new=True' if not sample_keys: sample_keys = self.state_keys for unit_type, column, idxs in sample_keys: if 'res_' not in unit_type: self._sample_from_range(unit_type, column, idxs) def _sample_from_range(self, unit_type, column, idxs) -> None: df = self.net[unit_type] # Make sure to sample from biggest possible range try: low = df[f'min_min_{column}'].loc[idxs] except KeyError: low = df[f'min_{column}'].loc[idxs] try: high = df[f'max_max_{column}'].loc[idxs] except KeyError: high = df[f'max_{column}'].loc[idxs] r = self.np_random.uniform(low, high, size=(len(idxs),)) try: # Constraints are scaled, which is why we need to divide by scaling self.net[unit_type].loc[idxs, column] = r / df.scaling[idxs] except AttributeError: # If scaling factor is not defined, assume scaling=1 self.net[unit_type].loc[idxs, column] = r def _sample_normal(self, relative_std=None, truncated=False, sample_new=True, **kwargs) -> None: """ Sample data around mean values from simbench data. """ assert sample_new, 'Currently only implemented for sample_new=True' for unit_type, column, idxs in self.state_keys: if 'res_' in unit_type or 'poly_cost' in unit_type: continue df = self.net[unit_type].loc[idxs] mean = df[f'mean_{column}'] max_values = (df[f'max_max_{column}'] / df.scaling).to_numpy() min_values = (df[f'min_min_{column}'] / df.scaling).to_numpy() diff = max_values - min_values if relative_std: std = relative_std * diff else: std = df[f'std_dev_{column}'] if truncated: # Make sure to re-distribute truncated values random_values = stats.truncnorm.rvs( min_values, max_values, mean, std * diff, len(mean)) else: # Simply clip values to min/max range random_values = self.np_random.normal( mean, std * diff, len(mean)) random_values = np.clip( random_values, min_values, max_values) self.net[unit_type].loc[idxs, column] = random_values def _set_simbench_state(self, step: int=None, test=False, noise_factor=0.1, noise_distribution='uniform', interpolate_steps=False, *args, **kwargs) -> None: """ Standard pre-implemented method to sample a random state from the simbench time-series data and set that state. Works only for simbench systems! """ total_n_steps = len(self.profiles[('load', 'q_mvar')]) if step is None: if test is True and self.evaluate_on == 'test': step = self.np_random.choice(self.test_steps) elif test is True and self.evaluate_on == 'validation': step = self.np_random.choice(self.validation_steps) else: step = self.np_random.choice(self.train_steps) else: assert step < total_n_steps self.current_simbench_step = step for type_act in self.profiles.keys(): if not self.profiles[type_act].shape[1]: continue unit_type, actuator = type_act data = self.profiles[type_act].loc[step, self.net[unit_type].index] if interpolate_steps and step < total_n_steps - 1: # Random linear interpolation between two steps next_data = self.profiles[type_act].loc[step + 1, self.net[unit_type].index] r = self.np_random.random() data = data * r + next_data * (1 - r) # Add some noise to create unique data samples if noise_distribution == 'uniform': # Uniform distribution: noise_factor as relative sample range noise = self.np_random.random( len(self.net[unit_type].index)) * noise_factor * 2 + (1 - noise_factor) new_values = (data * noise).to_numpy() elif noise_distribution == 'normal': # Normal distribution: noise_factor as relative std deviation new_values = self.np_random.normal( loc=data, scale=data.abs() * noise_factor) # Make sure that the range of original data remains unchanged # (Technical limits of the units remain the same) new_values = np.clip( new_values, self.profiles[type_act].min( )[self.net[unit_type].index].to_numpy(), self.profiles[type_act].max( )[self.net[unit_type].index].to_numpy()) self.net[unit_type].loc[self.net[unit_type].index, actuator] = new_values
[docs] def step(self, action: np.ndarray) -> tuple[np.ndarray, float, bool, bool, dict]: """ gymnasium API: Step the environment to a new state. Applies the actions, runs a power flow, checks for constraint violations, calculates the reward, and returns all information requires for learning :param action: The action to apply to the power system. """ assert not np.isnan(action).any() self.info = {} self.step_in_episode += 1 if self.apply_action: correction = self._apply_actions(action, self.diff_action_step_size) self.run_power_flow() if not self.power_flow_available: # Something went seriously wrong! Find out what! # Maybe NAN in power setpoints?! # Maybe simply catch this with a strong negative reward?! logging.critical(f'\nPowerflow not converged and reason unknown! Run diagnostic tool to at least find out what went wrong: {pp.diagnostic(self.net)}') self.info['valids'] = np.array([False] * 5) self.info['violations'] = np.array([1] * 5) self.info['unscaled_penalties'] = np.array([1] * 5) self.info['penalty'] = 5 return np.array([np.nan]), np.nan, True, False, copy.deepcopy(self.info) reward = self.calculate_reward() if self.clipped_action_penalty and self.apply_action: reward -= correction * self.clipped_action_penalty if self.steps_per_episode == 1: terminated = True truncated = False elif self.step_in_episode >= self.steps_per_episode: terminated = False truncated = True else: terminated = False truncated = False obs = self._get_obs(self.obs_keys, self.add_time_obs, self.add_mean_obs) assert not np.isnan(obs).any() return obs, reward, terminated, truncated, copy.deepcopy(self.info)
def _apply_actions(self, action, diff_action_step_size=None) -> float: """ Apply agent actions as setpoints to the power system at hand. Returns the mean correction that was necessary to make the actions valid.""" self.set_power_flow_unavailable() # Clip invalid actions action = np.clip(action, self.action_space.low, self.action_space.high) counter = 0 for unit_type, actuator, idxs in self.act_keys: if len(idxs) == 0: continue df = self.net[unit_type] partial_act = action[counter:counter + len(idxs)] if self.autoscale_actions: # Ensure that actions are always valid by using the current range min_action = df[f'min_{actuator}'].loc[idxs] max_action = df[f'max_{actuator}'].loc[idxs] else: # Use the full action range instead (only different if min/max change during training) min_action = df[f'min_min_{actuator}'].loc[idxs] max_action = df[f'max_max_{actuator}'].loc[idxs] delta_action = (max_action - min_action).values # Always use continuous action space [0, 1] if diff_action_step_size: # Agent sets incremental setpoints instead of absolute ones. previous_setpoints = self.net[unit_type][actuator].loc[idxs].values if 'scaling' in df.columns: previous_setpoints *= df.scaling.loc[idxs] # Make sure decreasing the setpoint is possible as well partial_act = partial_act * 2 - 1 setpoints = partial_act * diff_action_step_size * delta_action + previous_setpoints else: # Agent sets absolute setpoints in range [min, max] setpoints = partial_act * delta_action + min_action # Autocorrect impossible setpoints if not self.autoscale_actions or diff_action_step_size: if f'max_{actuator}' in df.columns: mask = setpoints > df[f'max_{actuator}'].loc[idxs] setpoints[mask] = df[f'max_{actuator}'].loc[idxs][mask] if f'min_{actuator}' in df.columns: mask = setpoints < df[f'min_{actuator}'].loc[idxs] setpoints[mask] = df[f'min_{actuator}'].loc[idxs][mask] if 'scaling' in df.columns: # Scaling column sometimes not existing setpoints /= df.scaling.loc[idxs] if actuator == 'closed' or actuator == 'in_service': # Special case: Only binary actions setpoints = np.round(setpoints).astype(bool) elif actuator == 'tap_pos' or actuator == 'step': # Special case: Only discrete actions setpoints = np.round(setpoints) self.net[unit_type].loc[idxs, actuator] = setpoints counter += len(idxs) # Did the action need to be corrected to be in bounds? mean_correction = np.mean(abs( self.get_current_actions(from_results_table=False) - action)) return mean_correction def calculate_objective(self, net=None, diff_objective=False) -> np.ndarray: """ This method returns the objective function as array that is used as basis for the reward calculation. """ net = net or self.net if diff_objective: return -self.objective_function(net) - self.initial_obj else: return -self.objective_function(net) def calculate_violations(self, net=None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: net = net or self.net valids = [] violations = [] penalties = [] for constraint in self.constraints: result = constraint.get_violation_metrics(net) valids.append(result['valid']) violations.append(result['violation']) penalties.append(result['penalty']) return np.array(valids), np.array(violations), np.array(penalties) def calculate_reward(self) -> float: """ Combine objective function and the penalties together. """ objective = np.sum(self.calculate_objective(diff_objective=self.diff_objective)) valids, violations, penalties = self.calculate_violations() self.info['valids'] = np.array(valids) self.info['violations'] = np.array(violations) self.info['unscaled_penalties'] = np.array(penalties) penalty = np.sum(penalties) valid = valids.all() reward = self.reward_function(objective, penalty, valid) self.info['cost'] = self.reward_function.calculate_cost(penalty, valid) return reward def _get_obs(self, obs_keys, add_time_obs=False, add_mean_obs=False ) -> np.ndarray: obss = [(self.net[unit_type].loc[idxs, column].to_numpy()) if (unit_type != 'load' or not self.bus_wise_obs) else get_bus_aggregated_obs(self.net, 'load', column, idxs) for unit_type, column, idxs in obs_keys] if add_mean_obs: mean_obs = [np.mean(partial_obs) for partial_obs in obss if len(partial_obs) > 1] obss.append(mean_obs) if add_time_obs and self.current_simbench_step is not None: time_obs = get_simbench_time_observation( self.profiles, self.current_simbench_step) obss = [time_obs] + obss return np.concatenate(obss)
[docs] def get_state(self) -> np.ndarray: """ Return the state of the underlying power system. Relevant for partially observable environments. Steal the popgym API for this, compare https://popgym.readthedocs.io/en/latest/autoapi/popgym/core/env/index.html """ return self._get_obs(self.state_keys)
[docs] def render(self, **kwargs): """ gymnasium API. Render the current state of the power system. Uses the `simple_plot` pandapower method. Overwrite for more sophisticated rendering. For kwargs information, refer to the pandapower docs: https://pandapower.readthedocs.io/en/latest/plotting/matplotlib/simple_plot.html""" ax = pp.plotting.simple_plot(self.net, **kwargs) return ax
def get_current_actions(self, net=None, from_results_table=True) -> np.ndarray: # Attention: These are not necessarily the actions of the RL agent # because some re-scaling might have happened! # These are the actions from the original action space [0, 1] net = net or self.net res_prefix = 'res_' if from_results_table else '' action = [] for unit_type, column, idxs in self.act_keys: setpoints = net[f'{res_prefix}{unit_type}'].loc[idxs, column] # If data not taken from results table, scaling required if not from_results_table and 'scaling' in net[unit_type].columns: setpoints *= net[unit_type].scaling.loc[idxs] # Action space depends on autoscaling min_id = 'min_' if self.autoscale_actions else 'min_min_' max_id = 'max_' if self.autoscale_actions else 'max_max_' min_values = net[unit_type][f'{min_id}{column}'].loc[idxs] max_values = net[unit_type][f'{max_id}{column}'].loc[idxs] action.append((setpoints - min_values) / (max_values - min_values)) return np.concatenate(action)
[docs] def get_actions(self) -> np.ndarray: """ Returns the current actions that were applied to the power system. Warning: Not necessarily the exact same actions that were used in the :meth:`step` method because some rounding, clipping, etc. might have happened. However, the resulting power flows should be same. Useful for storing and reproducing results. """ if self.power_flow_available: return self.get_current_actions(from_results_table=True) return self.get_current_actions(from_results_table=False)
[docs] def get_optimal_actions(self) -> np.ndarray: """ Returns the optimal actions that were calculated by the OPF. Useful for creating datasets for supervised learning. Warning: Can only be called if :meth:`run_optimal_power_flow` method was called before. """ self.ensure_optimal_power_flow_available() # The pandapower OPF stores the optimal settings only in the results table return self.get_current_actions(self.optimal_net, from_results_table=True)
[docs] def is_state_valid(self) -> bool: """ Returns True if the current state does not contain constraint violations. """ self.ensure_power_flow_available() valids, _, _ = self.calculate_violations(self.net) return valids.all()
[docs] def is_optimal_state_valid(self) -> bool: """ Returns True if the state after OPF calculation does not contain constraint violations. Warning: Can only be called if :meth:`run_optimal_power_flow` method was called before. Warning 2: Usually, the OPF does not converge if no valid solutions can be found. This method is only applicable if the OPF yielded a solution. However, a non-converged OPF can be counted as an invalid state in most cases. """ self.ensure_optimal_power_flow_available() valids, _, _ = self.calculate_violations(self.optimal_net) return valids.all()
[docs] def get_objective(self) -> float: """ Returns the currrent value of the objective function. """ self.ensure_power_flow_available() return sum(self.calculate_objective(self.net))
[docs] def get_optimal_objective(self) -> float: """ Returns the optimal value of the objective function. Warning: Can only be called if :meth:`run_optimal_power_flow` method was called before. """ self.ensure_optimal_power_flow_available() return sum(self.calculate_objective(self.optimal_net))
[docs] def run_power_flow(self, **kwargs): """ Updates the current power system state with the respective power flow (line loading, voltage magnitudes, etc.). Should be called whenever the power system state changed. The keyword arguments can be used to pass additional arguments to the power flow solver. :param kwargs: Additional arguments for the power flow solver. (default: pandapower. Compare: https://pandapower.readthedocs.io/en/latest/powerflow/ac.html) """ try: self._run_power_flow(self.net, **kwargs) self.power_flow_available = True return True except pp.powerflow.LoadflowNotConverged: logging.warning('Powerflow not converged!!!') return False
[docs] def run_optimal_power_flow(self, **kwargs): """ Creates and internal copy of the power system with its current state and performs the OPF on that copy. Should be called to compare the current solution with the optimal solution. The keyword arguments can be used to pass additional arguments to the pandapower OPF solver. :param kwargs: Additional arguments for the OPF solver. (default: pandapower. Compare: https://pandapower.readthedocs.io/en/latest/opf/formulation.html) """ self.optimal_net = copy.deepcopy(self.net) try: self._run_optimal_power_flow(self.optimal_net, **kwargs) self.optimal_power_flow_available = True return True except pp.optimal_powerflow.OPFNotConverged: logging.warning('OPF not converged!!!') return False
def ensure_power_flow_available(self): if not self.power_flow_available: raise PowerFlowNotAvailable('Please call `run_power_flow` first!') def ensure_optimal_power_flow_available(self): if not self.optimal_power_flow_available: raise PowerFlowNotAvailable('Please call `run_optimal_power_flow` first!') def set_power_flow_unavailable(self): """ Reset the power flow availability to indicate that a new power flow or OPF calculation is required. """ self.power_flow_available = False self.optimal_power_flow_available = False @staticmethod def default_power_flow(net, enforce_q_lims=True, **kwargs): """ Default power flow: Use the pandapower power flow. Default setting: Enforce q limits as automatic constraint satisfaction. """ try: pp.runpp(net, enforce_q_lims=enforce_q_lims, **kwargs) except pp.powerflow.LoadflowNotConverged: logging.warning('Powerflow not converged! Try again without lightsim2grid.') # This happened more often after lightsim2grid was added. # Test if removing lightsim2grid solves the issue. pp.runpp(net, enforce_q_lims=enforce_q_lims, lightsim2grid=False, **kwargs) logging.warning('Powerflow converged without lightsim2grid.') @staticmethod def default_optimal_power_flow(net, calculate_voltage_angles=False, **kwargs): """ Default OPF: Use the pandapower OPF. Default setting: Do not calculate voltage angles because often results in errors for SimBench nets. """ pp.runopp(net, calculate_voltage_angles=calculate_voltage_angles, **kwargs) def get_obs_and_state_space(net: pp.pandapowerNet, obs_or_state_keys: list, add_time_obs: bool=False, add_mean_obs: bool=False, seed: int=None, last_n_obs: int=1, bus_wise_obs=False) -> gym.spaces.Box: """ Get observation or state space from the constraints of the power network. """ lows, highs = [], [] if add_time_obs: # Time is always given as observation of lenght 6 in range [-1, 1] # at the beginning of the observation! lows.append(-np.ones(6)) highs.append(np.ones(6)) for unit_type, column, idxs in obs_or_state_keys: if 'res_' in unit_type: # The constraints are never defined in the results table unit_type = unit_type[4:] elif 'max_' in column or 'min_' in column: # If the constraint itself is an observation, treat is the same as a normal observation -> remove prefix column = column[4:] if column == 'va_degree': # Usually no constraints for voltage angles defined # Assumption: [30, 30] degree range (experience) l = np.full(len(idxs), -30) h = np.full(len(idxs), +30) else: try: if f'min_min_{column}' in net[unit_type].columns: l = net[unit_type][f'min_min_{column}'].loc[idxs].to_numpy() else: l = net[unit_type][f'min_{column}'].loc[idxs].to_numpy() if f'max_max_{column}' in net[unit_type].columns: h = net[unit_type][f'max_max_{column}'].loc[idxs].to_numpy() else: h = net[unit_type][f'max_{column}'].loc[idxs].to_numpy() except KeyError: # Special case: trafos and lines (have minimum constraint of zero) l = np.zeros(len(idxs)) # Assumption: No lines with loading more than 150% h = net[unit_type][f'max_{column}'].loc[idxs].to_numpy() * 1.5 # Special case: voltages if column == 'vm_pu' or unit_type == 'ext_grid': diff = h - l # Assumption: If [0.95, 1.05] voltage band, no voltage outside [0.875, 1.125] range l = l - diff * 0.75 h = h + diff * 0.75 try: if 'min' in column or 'max' in column: # Constraints need to remain scaled raise AttributeError l = l / net[unit_type].scaling.loc[idxs].to_numpy() h = h / net[unit_type].scaling.loc[idxs].to_numpy() except AttributeError: logging.info( f'Scaling for {unit_type} not defined: assume scaling=1') if bus_wise_obs and unit_type == 'load': # Aggregate loads bus-wise. Currently only for loads! buses = sorted(set(net[unit_type].bus)) l = [sum(l[net[unit_type].bus == bus]) for bus in buses] h = [sum(h[net[unit_type].bus == bus]) for bus in buses] for _ in range(last_n_obs): if len(l) > 0 and len(l) == len(h): lows.append(l) highs.append(h) if add_mean_obs: # Add mean values of each category as additional observations start_from = 1 if add_time_obs else 0 add_l = [np.mean(l) for l in lows[start_from:] if len(l) > 1] add_h = [np.mean(h) for h in highs[start_from:] if len(h) > 1] lows.append(np.array(add_l)) highs.append(np.array(add_h)) assert not sum(pd.isna(l).any() for l in lows) assert not sum(pd.isna(h).any() for h in highs) return gym.spaces.Box( np.concatenate(lows, axis=0), np.concatenate(highs, axis=0), seed=seed) def get_bus_aggregated_obs(net, unit_type, column, idxs) -> np.ndarray: """ Aggregate power values that are connected to the same bus to reduce state space. """ df = net[unit_type].iloc[idxs] return df.groupby(['bus'])[column].sum().to_numpy() def assert_only_net_in_signature(function): """ Ensure that the function only takes a pandapower net as argument. """ signature = inspect.signature(function) message = 'Function must only take a pandapower net as argument!' assert list(signature.parameters.keys()) == ['net'], message def raise_opf_not_converged(net, **kwargs): raise pp.optimal_powerflow.OPFNotConverged( "OPF solver not available for this environment.")