Source code for gym_anm.agents.mpc

"""The base class for MPC-based policies."""
import cvxpy as cp
import numpy as np

from gym_anm.simulator.components import Load, StorageUnit, Generator, RenewableGen


[docs]class MPCAgent(object): """ A base class for an agent that solves a MPC DC Optimal Power Flow. This agent accesses the full state of the distribution network at time :math:`t` and then solves an N-stage optimization problem. The optimization problem solved is an instance of a Direct Current (DC) Optimal Power Flow problem. The construction of the DC OPF problem relies on 3 assumptions: 1. transmission lines are lossless, i.e., :math:`r_{ij} = 0, \\forall e_{ij} \in \mathcal E,` 2. the difference between adjacent bus voltage angles is small, i.e., :math:`\\angle V_i = \\angle V_j, \\forall e_{ij} \in \mathcal E`, 3. nodal voltage magnitude are close to unity, i.e., :math:`|V_{i}| = 1, \\forall i \in \mathcal N`. The construction of the N-stage optimization problem relies on two additional assumptions: All sub-classes must implement the :py:func`forecast()` method to provide forecasts of demand and generation over the optimization horizon :math:`[t+1,t+N]`. All values are used in per-unit. """
[docs] def __init__(self, simulator, action_space, gamma, safety_margin=0.9, planning_steps=1): """ Parameters ---------- simulator : :py:class:`gym_anm.simulator.simulator.Simulator` The electricity distribution network simulator. action_space : :py:class:`gym.spaces.Box` The action space of the environment (used to clip actions). gamma : float The discount factor in [0, 1]. safety_margin : float, optional The safety margin constant :math:`\\beta` in [0, 1], used to further constraint the power flow on each transmission line, thus likely accounting for the error introduced in the DC approximation. planning_steps : int, optional The number (N) of stages (time steps) taken into account in the optimization problem. """ # Global parameters. self.safety_margin = safety_margin self.baseMVA = simulator.baseMVA self.lamb = simulator.lamb self.action_space = action_space self.planning_steps = planning_steps self.gamma = gamma # Information about all sets (buses, branches, etc.). self.n_bus = simulator.N_bus self.n_dev = simulator.N_device self.n_branch = len(simulator.branches) self.delta_t = simulator.delta_t self.n_gen = simulator.N_non_slack_gen + 1 self.n_des = simulator.N_des self.n_load = simulator.N_load self.n_rer = simulator.N_gen_rer self.load_ids = [i for i, d in simulator.devices.items() if isinstance(d, Load)] self.gen_ids = [i for i, d in simulator.devices.items() if isinstance(d, Generator)] self.non_slack_gen_ids = [ i for i, d in simulator.devices.items() if isinstance(d, Generator) and not d.is_slack ] self.gen_rer_ids = [i for i, d in simulator.devices.items() if isinstance(d, RenewableGen)] self.des_ids = [i for i, d in simulator.devices.items() if isinstance(d, StorageUnit)] self.branch_ids = list(simulator.branches.keys()) self.device_ids = list(simulator.devices.keys()) self.bus_ids = list(simulator.buses.keys()) self.slack_dev_id = [ i for i in self.device_ids if i not in self.des_ids and i not in self.non_slack_gen_ids and i not in self.load_ids ][0] # Define a mapping from the device to the buses they are connected to # (using the simulator internal indices). self.dev_to_bus = {i: d.bus_id for i, d in simulator.devices.items()} # Define a mapping from `bus_id` to [0, n_bus], where n_bus is the actual # number of buses in the network. This is useful if the bus indices # skip some numbers (e.g., original bus indices where [0, 1, 3]). self.bus_id_mapping = {} for i, bus_id in enumerate(simulator.buses.keys()): self.bus_id_mapping[bus_id] = i # Define a similar mapping for the devices. self.dev_id_mapping = {} for i, dev_id in enumerate(simulator.devices.keys()): self.dev_id_mapping[dev_id] = i # Empty lists to store all single-timestep optimization variables. self.V_bus_ang_vars = [] self.P_dev_vars = [] # Define time-varying parameters. self.P_load_forecast = cp.Parameter((self.n_load, self.planning_steps)) self.P_gen_forecast = cp.Parameter((self.n_gen - 1, self.planning_steps), nonneg=True) self.init_soc = cp.Parameter(self.n_des, nonneg=True) # Define constants. bus_ids = [self.bus_id_mapping[i] for i in self.bus_ids] self.B_bus = simulator.Y_bus.imag[bus_ids, :][:, bus_ids].toarray() self.branch_rate = [br.rate for br in simulator.branches.values()] self.P_gen_min = [g.p_min for g in simulator.devices.values() if isinstance(g, Generator) and not g.is_slack] self.P_gen_max = [g.p_max for g in simulator.devices.values() if isinstance(g, Generator) and not g.is_slack] self.P_des_min = [d.p_min for d in simulator.devices.values() if isinstance(d, StorageUnit)] self.P_des_max = [d.p_max for d in simulator.devices.values() if isinstance(d, StorageUnit)] self.soc_min = [d.soc_min for d in simulator.devices.values() if isinstance(d, StorageUnit)] self.soc_max = [d.soc_max for d in simulator.devices.values() if isinstance(d, StorageUnit)] self.des_eff = [d.eff for d in simulator.devices.values() if isinstance(d, StorageUnit)] # Indicate the optimization problem has not been constructed yet. This is # done during the first call of `act()`. self.dc_opf = None
def _create_p_bus_expressions(self, P_dev): """ Define :math:`P_i^{(bus)} = sum_d P_d^{(dev)}` expressions. Returns ------- list of cvxpy.Expression The active power injection at each bus expressed in terms of the active power injection of its devices. """ P_bus = [0.0] * self.n_bus for d in self.device_ids: i = self.dev_to_bus[d] P_bus[self.bus_id_mapping[i]] += P_dev[self.dev_id_mapping[d]] return P_bus def _create_p_branch_expressions(self, V_bus_ang): """ Define :math:`P_{ij} = B_{ij} * (V_ang[i] - V_ang[j])` expressions. Returns ------- list of cvxpy.Expression The branch active power flow expressed in terms of the nodal voltage phase angle variables. """ P_branch = [] for i, j in self.branch_ids: k = self.bus_id_mapping[i] l = self.bus_id_mapping[j] c = self.B_bus[k, l] * (V_bus_ang[k] - V_bus_ang[l]) P_branch.append(c) return P_branch def _create_optimization_problem(self): """ Create the N-stage cvxpy optimization problem. Returns ------- problem : cvxpy.Problem The N-stage convex optimization problem. """ objective = 0.0 constraints = [] # Variable coupled between different time steps. soc = self.init_soc for i in range(self.planning_steps): # Extract forecasted values for timestep t+1+i. P_load_forecast = self.P_load_forecast[:, i] P_gen_forecast = self.P_gen_forecast[:, i] # Create a new single-step optimization problem coupled with the # previous time step. obj, consts, optim_vars, soc = self._single_step_optimization_problem(P_load_forecast, P_gen_forecast, soc) objective += self.gamma**i * obj constraints += consts # Store the optimization variables. self.P_dev_vars.append(optim_vars["P_dev"]) self.V_bus_ang_vars.append(optim_vars["V_bus_ang"]) # 3. Construct the final N-stage optimization problem. problem = cp.Problem(cp.Minimize(objective), constraints) # 4. Extract the final optimization variables (from the first stage). self.P_dev = self.P_dev_vars[0] self.V_bus_ang = self.V_bus_ang_vars[0] return problem def _single_step_optimization_problem(self, P_load_forecast, P_gen_forecast, soc_prev): """ Define a single-stage instance of the DC OPF optimization problem. Parameters ---------- P_load_forecast : cvxpy.Expression The vector of forecasted load active power injections. P_gen_forecast : cvxpy.Expression The vector of forecasted maximum active power generation from non-slack generators. soc_prev : cvxpy.Expression The vector of current state of charge of DES units. Returns ------- obj : cvxpy.Expression The objective to be minimized. constraints : list of cvxpy.Constraint A list of constraints. optim_var : dict of {str : cvxpy.Variable or cvxpy.Expression} The optimization variables, with keys {'V_bus_ang', 'P_dev', 'P_bus', 'P_branch'}. new_socs : list of cvxpy.Expression The state of charge of the DES units at the next time step, expressed in terms of the optimization variables. """ # 1. Create optimization variables for this timestep. V_bus_ang = cp.Variable(self.n_bus) P_dev = cp.Variable(self.n_dev) # 2. Define P_bus and P_branch expressions. P_bus = self._create_p_bus_expressions(P_dev) P_branch = self._create_p_branch_expressions(V_bus_ang) # 3. Construct the constraints. constraints = [] # P_bus[i] = \sum_{ij} B_{ij} (V_ang[i] - V_ang[j]). a = [] for i in self.bus_ids: c = 0.0 for j, k in self.branch_ids: l = self.bus_id_mapping[j] m = self.bus_id_mapping[k] if j == i: c += self.B_bus[l, m] * (V_bus_ang[l] - V_bus_ang[m]) elif k == i: c += self.B_bus[m, l] * (V_bus_ang[m] - V_bus_ang[l]) a.append(c) constraints += _make_list_eq_constraints(a, P_bus) # P_load(t+1) = P_load_forecast c = [] for l in self.load_ids: c.append(P_dev[self.dev_id_mapping[l]]) constraints += _make_list_eq_constraints(c, P_load_forecast) # P_min <= P_gen <= P_max_ (for non-slack generators). c = [] for g in self.non_slack_gen_ids: c.append(P_dev[self.dev_id_mapping[g]]) constraints += _make_list_le_constraints(self.P_gen_min, c) constraints += _make_list_le_constraints(c, self.P_gen_max) # P_min <= P_des <= P_max (for DES units). c = [] for des in self.des_ids: c.append(P_dev[self.dev_id_mapping[des]]) constraints += _make_list_le_constraints(self.P_des_min, c) constraints += _make_list_le_constraints(c, self.P_des_max) # P <= P_gen_forecast (for non-slack generators). c = [] for gen in self.non_slack_gen_ids: c.append(P_dev[self.dev_id_mapping[gen]]) constraints += _make_list_le_constraints(c, P_gen_forecast) # P >= (soc_{t-1} - soc_max) / (delta_t * eff) # P <= (soc_{t-1} - soc_min) * eff / delta_t new_socs, c = [], [] for i, des in enumerate(self.des_ids): p_charging = cp.Variable(nonneg=True) p_discharging = cp.Variable(nonneg=True) delta_soc_charging = p_charging * self.delta_t * self.des_eff[i] delta_soc_discharging = -p_discharging * self.delta_t / self.des_eff[i] new_soc = soc_prev[i] + delta_soc_charging + delta_soc_discharging new_socs.append(new_soc) c.append(P_dev[self.dev_id_mapping[des]] == p_discharging - p_charging) constraints += c constraints += _make_list_le_constraints(self.soc_min, new_socs) constraints += _make_list_le_constraints(new_socs, self.soc_max) # - \pi <= V_angle <= \pi constraints += _make_list_le_constraints([-np.pi] * self.n_bus, V_bus_ang) constraints += _make_list_le_constraints(V_bus_ang, [np.pi] * self.n_bus) # V_angle[0] = 0 constraints.append(V_bus_ang[self.dev_id_mapping[self.slack_dev_id]] == 0.0) # 4. Construct the objective function. cost_1 = 0.0 for gen in self.gen_ids: if gen not in self.gen_rer_ids: cost_1 += P_dev[self.dev_id_mapping[gen]] cost_2 = 0.0 for p, rate in zip(P_branch, self.branch_rate): cost_2 += cp.maximum(0, cp.abs(p) - self.safety_margin * rate) obj = cost_1 + self.lamb * cost_2 # 5. Optimization variable dictionary to return. optim_vars = {"V_bus_ang": V_bus_ang, "P_dev": P_dev, "P_bus": P_bus, "P_branch": P_branch} return obj, constraints, optim_vars, new_socs
[docs] def act(self, env): """ Select an action by solving the N-stage DC OPF. Parameters ---------- env : py:class:`gym_anm.ANMEnv` The :code:`gym-anm` environment. Returns ------- numpy.ndarray The action vector to apply in the environment. """ P_load_forecasts, P_gen_forecasts = self.forecast(env) a = self._solve(env.simulator, P_load_forecasts, P_gen_forecasts) # Clip the actions, which are sometime beyond the space by a tiny # amount due to precision errors in the optimization problem # solution (e.g., of the order of 1e-10). a = np.clip(a, self.action_space.low, self.action_space.high) return a
[docs] def forecast(self, env): """ Forecast the demand and generation over the optimization horizon. This method must be implemented by all sub-classes of :py:class:`MPCAgent`. It gets called in :py:func:`act()` and must return the predictions of future load demand, :math:`\\tilde P_{l,k}^{(dev)}`, and maximum generation at non-slack generators, :math:`\\tilde P_{g,k}^{(max)}`. Parameters ---------- env : py:class:`gym_anm.ANMEnv` The :code:`gym-anm` environment. Returns ------- P_load_forecast : array_like A (N_load, N) array of forecasted load power injections (<0), where N is the length of the optimization horizon. The rows should be ordered in increasing order of device ID. P_gen_forecast : array_like A (N_gen-1, N) array of forecasted maximum generation from non-slack generators, where N is the length of the optimization horizon. The rows should be ordered in increasing order of device ID. """ raise NotImplementedError()
def _solve(self, simulator, load_forecasts, gen_forecasts): """Select an action by solving the N-stage DC OPF.""" # Initialize the optimization problem during the first iteration. if self.dc_opf is None: self.dc_opf = self._create_optimization_problem() # Update the time-varying parameters (fixed values). self._update_parameters(simulator, load_forecasts, gen_forecasts) # Solve the DC OPF (convex program). self.dc_opf.solve() if self.dc_opf.status != "optimal": print("OPF problem is " + self.dc_opf.status) # Extract the control variables (scale from p.u. to MW or MVAr). P_gen = [self.P_dev.value[self.dev_id_mapping[d]] * self.baseMVA for d in self.non_slack_gen_ids] Q_gen = [0.0] * len(P_gen) P_des = [self.P_dev.value[self.dev_id_mapping[d]] * self.baseMVA for d in self.des_ids] Q_des = [0.0] * len(P_des) return np.concatenate((P_gen, Q_gen, P_des, Q_des)) def _update_parameters(self, simulator, P_load_forecasts, P_gen_forecasts): """ Update the time-dependent fixed values of the optimization problem. Parameters ---------- simulator : py:class:`gym_anm.simulator.Simulator` The power grid simulator of the environment. P_load_forecasts : numpy.ndarray A (N_load, N) array of forecasted demand values over the optimization horizon [t+1, t+N]. The rows should be ordered in increasing order of device index. P_gen_forecasts : numpy.ndarray A (N_gen-1, N) array of forecasted maximum generation from non-slack generators over the optimization horizon [t+1, t+N]. The rows should be ordered in increasing order of device index. """ # Update the parameters of the optimization problem with the # forecasted values. self.P_load_forecast.value = np.array(P_load_forecasts) self.P_gen_forecast.value = np.array(P_gen_forecasts) # Set the initial state of charge of DES units. self.init_soc.value = [simulator.state["des_soc"]["pu"][i] for i in self.des_ids]
def _make_list_eq_constraints(a, b): """Create a list of cvxpy equality constraints from two lists.""" if isinstance(a, list): n = len(a) elif isinstance(b, list): n = len(b) else: raise ValueError() return [a[i] == b[i] for i in range(n)] def _make_list_le_constraints(a, b): """Create a list of cvxpy less than or equal (<=) equality constraints.""" if isinstance(a, list): n = len(a) elif isinstance(b, list): n = len(b) else: raise ValueError() return [a[i] <= b[i] for i in range(n)]