"""The base class for :code:`gym-anm` environments."""
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
from logging import getLogger
from copy import deepcopy
import warnings
from scipy.sparse.linalg import MatrixRankWarning
from ..simulator import Simulator
from ..errors import ObsSpaceError, ObsNotSupportedError, EnvInitializationError, EnvNextVarsError
from .utils import check_env_args
from ..simulator.components.constants import STATE_VARIABLES
from ..simulator.components import StorageUnit, Generator, Load
logger = getLogger(__file__)
[docs]class ANMEnv(gym.Env):
"""
The base class for :code:`gym-anm` environments.
Attributes
----------
K : int
The number of auxiliary variables.
gamma : float
The fixed discount factor in [0, 1].
lamb : int or float
The factor multiplying the penalty associated with violating
operational constraints (used in the reward signal).
delta_t : float
The interval of time between two consecutive time steps (fraction of
hour).
simulator : :py:class:`gym_anm.simulator.simulator.Simulator`
The electricity distribution network simulator.
state_values : list of tuple of str
The electrical quantities to include in the state vectors. Each tuple
(x, y, z) refers to quantity x at nodes/devices/branches y, using units
z.
state_N : int
The number of state variables.
action_space : gym.spaces.Box
The action space from which the agent can select actions.
obs_values : list of str or None
Similarly to :py:obj:`state_values`, the values to include in the observation
vectors. If a customized :py:func:`observation()` function is provided, :py:obj:`obs_values`
is None.
observation_space : gym.spaces.Box
The observation space from which observation vectors are constructed.
observation_N : int
The number of observation variables.
done : bool
True if a terminal state has been reached (if the network collapsed);
False otherwise.
render_mode : str
The rendering mode. See :py:func:`render()`.
timestep : int
The current timestep.
state : numpy.ndarray
The current state vector :math:`s_t`.
e_loss : float
The energy loss during the last transition (part of the reward signal).
penalty : float
The penalty associated with violating operational constraints during the
last transition (part of the reward signal).
costs_clipping : tuple of float
The clipping values for the costs (- rewards), where :py:obj:`costs_clipping[0]` is
the clipping value for the absolute energy loss and :py:obj:`costs_clipping[1]` is
the clipping value for the constraint violation penalty.
pfe_converged : bool
True if the last transition converged to a load flow solution (i.e.,
the network is stable); False otherwise.
np_random : numpy.random.RandomState
The random state/seed of the environment.
"""
[docs] def __init__(self, network, observation, K, delta_t, gamma, lamb, aux_bounds=None, costs_clipping=None, seed=None):
"""
Parameters
----------
network : dict of {str : numpy.ndarray}
The network input dictionary describing the power grid.
observation : callable or list or str
The observation space. It can be specified as "state" to construct a
fully observable environment (:math:`o_t = s_t`); as a callable function such
that :math:`o_t = observation(s_t)`; or as a list of tuples (x, y, z) that
refer to the electrical quantities x (str) at the nodes/branches/devices
y (list or 'all') in unit z (str, optional).
K : int
The number of auxiliary variables.
delta_t : float
The interval of time between two consecutive time steps (fraction of
hour).
gamma : float
The discount factor in [0, 1].
lamb : int or float
The factor multiplying the penalty associated with violating
operational constraints (used in the reward signal).
aux_bounds : numpy.ndarray, optional
The bounds on the auxiliary internal variables as a 2D array where
the :math:`k^{th}`-1 auxiliary variable is bounded by
:py:obj:`[aux_bounds[k, 0], aux_bounds[k, 1]]`. This can be useful if auxiliary
variables are to be included in the observation vectors and a bounded
observation space is desired.
costs_clipping : tuple of float, optional
The clipping values for the costs in the reward signal, where element
0 is the clipping value for the energy loss cost and element 1 is the
clipping value for the constraint-violation penalty (e.g., (1, 100)).
seed : int, optional
A random seed.
"""
self.seed(seed)
self.K = K
self.gamma = gamma
self.lamb = lamb
self.delta_t = delta_t
self.aux_bounds = aux_bounds
# Do not clip costs if unspecified.
if costs_clipping is None:
c1, c2 = np.inf, np.inf
else:
c1 = np.inf if costs_clipping[0] is None else costs_clipping[0]
c2 = np.inf if costs_clipping[1] is None else costs_clipping[1]
self.costs_clipping = (c1, c2)
# Initialize the AC power grid simulator.
self.simulator = Simulator(network, self.delta_t, self.lamb)
# Check the arguments provided.
check_env_args(K, delta_t, lamb, gamma, observation, aux_bounds, self.simulator.state_bounds)
# Variables to include in state vectors.
self.state_values = [
("dev_p", "all", "MW"),
("dev_q", "all", "MVAr"),
("des_soc", "all", "MWh"),
("gen_p_max", "all", "MW"),
("aux", "all", None),
]
self.state_values = self._expand_all_ids(self.state_values)
self.state_N = sum(len(s[1]) for s in self.state_values)
# Build action space.
self.action_space = self._build_action_space()
# Build observation space.
self.obs_values = self._build_observation_space(observation)
self.observation_space = self.observation_bounds()
if self.observation_space is not None:
self.observation_N = self.observation_space.shape[0]
[docs] def init_state(self):
"""
Sample an initial state :math:`s_0`.
For reproducibility, the RandomState :py:obj:`self.np_random` should be used to
generate random numbers.
Returns
-------
numpy.ndarray
An initial state vector :math:`s_0`.
"""
raise NotImplementedError
[docs] def next_vars(self, s_t):
"""
Sample internal variables.
Parameters
----------
s_t : numpy.ndarray
The current state vector :math:`s_t`.
Returns
-------
numpy.ndarray
The internal variables for the next timestep, following the structure
:math:`[P_l, P_g^{(max)}, aux^{(k)}]`, where :math:`P_l` contains the load
injections (ordered by device ID), :math:`P_g^{(max)}` the maximum generation
from non-slack generators (ordered by device ID), and :math:`aux^{(k)} `the
auxiliary variables. The vector shape should be
:code:`(N_load + (N_generators-1) + K,)`.
"""
raise NotImplementedError
[docs] def observation_bounds(self):
"""
Builds the observation space of the environment.
If the observation space is specified as a callable object, then its
bounds are set to :code:`(- np.inf, np.inf)^{N_o}` by default (this is done
during the :py:func:`reset()` call, as the size of observation vectors is not
known before then). Alternatively, the user can specify their own bounds
by overwriting this function in a subclass.
Returns
-------
gym.spaces.Box or None
The bounds of the observation space.
"""
lower_bounds, upper_bounds = [], []
if self.obs_values is None:
logger.warning("The observation space is unbounded.")
# In this case, the size of the obs space is obtained after the
# environment has been reset. See `reset()`.
return None
else:
bounds = self.simulator.state_bounds
for key, nodes, unit in self.obs_values:
for n in nodes:
if key == "aux":
if self.aux_bounds is not None:
lower_bounds.append(self.aux_bounds[n][0])
upper_bounds.append(self.aux_bounds[n][1])
else:
lower_bounds.append(-np.inf)
upper_bounds.append(np.inf)
else:
lower_bounds.append(bounds[key][n][unit][0])
upper_bounds.append(bounds[key][n][unit][1])
space = spaces.Box(low=np.array(lower_bounds), high=np.array(upper_bounds), dtype=np.float64)
return space
[docs] def reset(self):
"""
Reset the environment.
If the observation space is provided as a callable object but the
:py:func:`observation_bounds()` method is not overwritten, then the bounds on the
observation space are set to :code:`(- np.inf, np.inf)` here (after the size of the
observation vectors is known).
Returns
-------
obs : numpy.ndarray
The initial observation vector.
"""
self.done = False
self.render_mode = None
self.timestep = 0
self.e_loss = 0.0
self.penalty = 0.0
# Initialize the state.
init_state_found = False
n_init_states = 0
n_init_states_max = 100
while not init_state_found:
n_init_states += 1
self.state = self.init_state()
# Check s_0 has the correct size.
expected = 2 * self.simulator.N_device + self.simulator.N_des + self.simulator.N_non_slack_gen + self.K
if self.state.size != expected:
msg = "Expected size of initial state s0 is %d but actual is %d" % (expected, self.state.size)
raise EnvInitializationError(msg)
# Apply the initial state to the simulator.
with warnings.catch_warnings():
warnings.simplefilter("ignore", MatrixRankWarning)
init_state_found = self.simulator.reset(self.state)
if n_init_states == n_init_states_max:
msg = "No non-terminal state found out of %d initial states for " "environment %s" % (
n_init_states_max,
self.__name__,
)
raise EnvInitializationError(msg)
# Reconstruct the sate vector in case the original state was infeasible.
self.state = self._construct_state()
# Construct the initial observation vector.
obs = self.observation(self.state)
# Update the observation space bounds if required.
if self.observation_space is None:
self.observation_space = spaces.Box(low=-np.ones(len(obs)) * np.inf, high=np.ones(len(obs)) * np.inf)
self.observation_N = self.observation_space.shape[0]
err_msg = "Observation %r (%s) invalid." % (obs, type(obs))
assert self.observation_space.contains(obs), err_msg
# Cast state and obs vectors to 0 (arbitrary) if a terminal state has
# been reached.
if self.done:
self.state = self._terminal_state(self.state_N)
obs = self._terminal_state(self.observation_N)
return obs
[docs] def observation(self, s_t):
"""
Returns the observation vector corresponding to the current state :math:`s_t`.
Alternatively, this function can be overwritten in customized environments.
Parameters
----------
s_t : numpy.ndarray
The current state vector :math:`s_t`.
Returns
-------
numpy.ndarray
The corresponding observation vector :math:`o_t`.
"""
obs = self._extract_state_variables(self.obs_values)
obs = np.clip(obs, self.observation_space.low, self.observation_space.high)
return obs
[docs] def step(self, action):
"""
Take a control action and transition from state :math:`s_t` to state :math:`s_{t+1}`.
Parameters
----------
action : numpy.ndarray
The action vector :math:`a_t` taken by the agent.
Returns
-------
obs : numpy.ndarray
The observation vector :math:`o_{t+1}`.
reward : float
The reward associated with the transition :math:`r_t`.
done : bool
True if a terminal state has been reached; False otherwise.
info : dict
A dictionary with further information (used for debugging).
"""
err_msg = "Action %r (%s) invalid." % (action, type(action))
assert self.action_space.contains(action), err_msg
# 0. Remain in a terminal state and output reward=0 if the environment
# has already reached a terminal state.
if self.done:
obs = self._terminal_state(self.observation_N)
return obs, 0.0, self.done, {}
# 1a. Sample the internal stochastic variables.
vars = self.next_vars(self.state)
expected_size = self.simulator.N_load + self.simulator.N_non_slack_gen + self.K
if vars.size != expected_size:
msg = "Next vars vector has size %d but expected is %d" % (vars.size, expected_size)
raise EnvNextVarsError(msg)
P_load = vars[: self.simulator.N_load]
P_pot = vars[self.simulator.N_load : self.simulator.N_load + self.simulator.N_non_slack_gen]
aux = vars[self.simulator.N_load + self.simulator.N_non_slack_gen :]
err_msg = "Only {} auxiliary variables are generated, but K={} are " "expected.".format(len(aux), self.K)
assert len(aux) == self.K, err_msg
# 1b. Convert internal variables to dictionaries.
load_idx, gen_idx = 0, 0
P_load_dict, P_pot_dict = {}, {}
for dev_id, dev in self.simulator.devices.items():
if isinstance(dev, Load):
P_load_dict[dev_id] = P_load[load_idx]
load_idx += 1
elif isinstance(dev, Generator) and not dev.is_slack:
P_pot_dict[dev_id] = P_pot[gen_idx]
gen_idx += 1
# 2. Extract the different actions from the action vector.
P_set_points = {}
Q_set_points = {}
gen_non_slack_ids = [
i for i, dev in self.simulator.devices.items() if isinstance(dev, Generator) and not dev.is_slack
]
des_ids = [i for i, dev in self.simulator.devices.items() if isinstance(dev, StorageUnit)]
N_gen = len(gen_non_slack_ids)
N_des = len(des_ids)
for a, dev_id in zip(action[:N_gen], gen_non_slack_ids):
P_set_points[dev_id] = a
for a, dev_id in zip(action[N_gen : 2 * N_gen], gen_non_slack_ids):
Q_set_points[dev_id] = a
for a, dev_id in zip(action[2 * N_gen : 2 * N_gen + N_des], des_ids):
P_set_points[dev_id] = a
for a, dev_id in zip(action[2 * N_gen + N_des :], des_ids):
Q_set_points[dev_id] = a
# 3a. Apply the action in the simulator.
with warnings.catch_warnings():
warnings.simplefilter("ignore", MatrixRankWarning)
_, r, e_loss, penalty, pfe_converged = self.simulator.transition(
P_load_dict, P_pot_dict, P_set_points, Q_set_points
)
# A terminal state has been reached if no solution to the power
# flow equations is found.
self.done = not pfe_converged
# 3b. Clip the reward.
if not self.done:
self.e_loss = np.sign(e_loss) * np.clip(np.abs(e_loss), 0, self.costs_clipping[0])
self.penalty = np.clip(penalty, 0, self.costs_clipping[1])
r = -(self.e_loss + self.penalty)
else:
# Very large reward if a terminal state has been reached.
r = -self.costs_clipping[1] / (1 - self.gamma)
self.e_loss = self.costs_clipping[0]
self.penalty = self.costs_clipping[1]
# 4. Construct the state and observation vector.
if not self.done:
for k in range(self.K):
self.state[k - self.K] = aux[k]
self.state = self._construct_state()
obs = self.observation(self.state)
err_msg = "Observation %r (%s) invalid." % (obs, type(obs))
assert self.observation_space.contains(obs), err_msg
# Cast state and obs vectors to 0 (arbitrary) if a terminal state is
# reached.
else:
self.state = self._terminal_state(self.state_N)
obs = self._terminal_state(self.observation_N)
# 5. Update the timestep.
self.timestep += 1
return obs, r, self.done, {}
[docs] def render(self, mode="human"):
"""
Update the rendering of the environment (to be overwritten).
Raises
------
NotImplementedError
"""
raise NotImplementedError()
[docs] def close(self):
"""
Close the rendering of the environment (to be overwritten).
Raises
-------
NotImplementedError
"""
raise NotImplementedError()
[docs] def seed(self, seed=None):
"""Seed the random number generator."""
self.np_random, seed = seeding.np_random(seed)
return [seed]
def _build_action_space(self):
"""
Build the available loose action space :math:`\mathcal A`.
Returns
-------
gym.spaces.Box
The action space of the environment.
"""
P_gen_bounds, Q_gen_bounds, P_des_bounds, Q_des_bounds = self.simulator.get_action_space()
lower_bounds, upper_bounds = [], []
for x in [P_gen_bounds, Q_gen_bounds, P_des_bounds, Q_des_bounds]:
for dev_id in sorted(x.keys()):
lower_bounds.append(x[dev_id][0])
upper_bounds.append(x[dev_id][1])
space = spaces.Box(low=np.array(lower_bounds), high=np.array(upper_bounds), dtype=np.float64)
return space
def _build_observation_space(self, observation):
"""Handles the different ways of specifying an observation space."""
# Case 1: environment is fully observable.
if isinstance(observation, str) and observation == "state":
obs_values = deepcopy(self.state_values)
# Case 2: observation space is provided as a list.
elif isinstance(observation, list):
obs_values = deepcopy(observation)
# Add default units when none is provided.
for idx, o in enumerate(obs_values):
if len(o) == 2:
obs_values[idx] = tuple(list(o) + [STATE_VARIABLES[o[0]][0]])
# Case 3: observation space is provided as a callable object.
elif callable(observation):
obs_values = None
self.observation = observation
else:
raise ObsSpaceError()
# Transform the 'all' option into a list of bus/branch/device IDs.
return self._expand_all_ids(obs_values)
def _expand_all_ids(self, values):
"""Helper function to translate the 'all' option in a list of IDS."""
# Transform the 'all' option into a list of bus/branch/device IDs.
if values is not None:
for idx, o in enumerate(values):
if isinstance(o[1], str) and o[1] == "all":
if "bus" in o[0]:
ids = list(self.simulator.buses.keys())
elif "dev" in o[0]:
ids = list(self.simulator.devices.keys())
elif "des" in o[0]:
ids = [i for i, d in self.simulator.devices.items() if isinstance(d, StorageUnit)]
elif "gen" in o[0]:
ids = [
i for i, d in self.simulator.devices.items() if isinstance(d, Generator) and not d.is_slack
]
elif "branch" in o[0]:
ids = list(self.simulator.branches.keys())
elif o[0] == "aux":
ids = list(range(0, self.K))
else:
raise ObsNotSupportedError(o[0], STATE_VARIABLES.keys())
values[idx] = (o[0], ids, o[2])
return values
def _construct_state(self):
"""
Construct the state vector :math:`s_t`.
Returns
-------
s_t : numpy.ndarray
The state vector.
"""
return self._extract_state_variables(self.state_values)
def _extract_state_variables(self, values):
"""
Extract variables used in state and observation vectors from the simulator.
Parameters
----------
values : list of tuples of (str, list, str)
The variables to extract as tuples, where each tuple (i, j, k) refers
to variable i at the nodes/branches/devices listed in j, using unit
k.
Returns
-------
numpy.ndarray
The vector of extracted state variables.
"""
full_state = self.simulator.state
vars = []
for value in values:
for idx in value[1]:
if value[0] in full_state.keys():
o = full_state[value[0]][value[2]][idx]
elif value[0] == "aux":
o = self.state[idx - self.K]
else:
raise ObsNotSupportedError(value[0], STATE_VARIABLES.keys())
vars.append(o)
return np.array(vars)
def _terminal_state(self, n):
"""
Return a 0-vector (arbitrarily chosen as terminal state).
Parameters
----------
n : int
The length of the vector to return.
Returns
-------
numpy.ndarray
A zero-vector of size n.
"""
return np.zeros(n)