Source code for gym_anm.simulator.components.devices

import numpy as np
import cvxpy as cp
from logging import getLogger

from .constants import DEV_H
from .errors import DeviceSpecError, LoadSpecError, GenSpecError, StorageSpecError

logger = getLogger(__file__)


[docs] class Device(object): """ A electric device connected to an electric power grid. Attributes ---------- dev_id : int The unique device ID :math:`d`. bus_id : int The ID of the bus the device is connected to :math:`i`. type : int The device type : -1 (load), 0 (slack), 1 (classical generator), 2 (renewable energy), 3 (storage). qp_ratio : float The constant ratio of reactive over real power injection :math:`(Q/P)_d`. p_min, p_max, q_min, q_max : float The minimum and maximum real (p.u.) and reactive (p.u.) power injections (:math:`\\underline P_d, \\overline P_d, \\underline Q_d, \\overline Q_d`). p_plus, p_minus, q_plus, q_minus : float The bounds describing the (P, Q) region of physical operation of the device (p.u.) (:math:`P^+_d, P^-_d, Q^+_d, Q^-_d`). soc_min, soc_max : float The minimum and maximum state of charge if the device is a storage unit :math:`[\\underline {SoC}_d, \\overline {SoC}_d]` (p.u. per hour); None otherwise. eff : float The round-trip efficiency :math:`\\eta \\in [0, 1]` if the device is a storage unit; None otherwise. tau_1, tau_2, tau_3, tau_4 : float The slope of the constraints on Q when P is near its maximum (:math:`\\tau^{(1)}_d, \\tau^{(2)}_d, \\tau^{(3)}_d, \\tau^{(4)}_d`) (1-2 are used for generators, 1-4 for storage units, and the remaining ones are None). rho_1, rho_2, rho_3, rho_4 : float The offset of the linear constraints on Q when P is near its maximum (:math:`\\rho^{(1)}_d, \\rho^{(2)}_d, \\rho^{(3)}_d, \\rho^{(4)}_d`)(see `tau_1`). is_slack : bool True if the device is connected to the slack bus; False otherwise. p, q : float The real :math:`P_d^{(dev)}` and reactive :math:`Q_d^{(dev)}` power injections from the device (p.u.). p_pot : float The maximum active generation of the device at the current time :math:`P_g^{(max)}` (p.u.), if a generator; None otherwise. soc : float The state of charge :math:`SoC_d` if the device is a storage unit (p.u. per hour); None otherwise. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): """ Parameters ---------- dev_spec : numpy.ndarray The corresponding device row in the network file describing the network. bus_ids : list of int The list of unique bus IDs. baseMVA : int The base power of the network. """ # Components used by all types of devices. self.dev_id = dev_spec[DEV_H["DEV_ID"]] if self.dev_id is None: raise DeviceSpecError("The device ID cannot be None.") else: self.dev_id = int(self.dev_id) self.bus_id = dev_spec[DEV_H["BUS_ID"]] if self.bus_id is None or self.bus_id not in bus_ids: raise DeviceSpecError( "Device {} has unique bus ID = {} but should" " be in {}.".format(self.dev_id, self.bus_id, bus_ids) ) else: self.bus_id = int(self.bus_id) self.type = dev_spec[DEV_H["DEV_TYPE"]] allowed = [-1, 0, 1, 2, 3] if self.type is None or self.type not in allowed: raise DeviceSpecError("The DEV_TYPE value for device %d should be" " in %s." % allowed) if self.type == 0: self.is_slack = True else: self.is_slack = False # Components device-specific. self.qp_ratio = None self.p_min, self.p_max = None, None self.q_min, self.q_max = None, None self.p_plus, self.p_minus = None, None self.q_plus, self.q_minus = None, None self.soc_min, self.soc_max = None, None self.eff = None self.tau_1, self.tau_2, self.tau_3, self.tau_4 = None, None, None, None self.rho_1, self.rho_2, self.rho_3, self.rho_4 = None, None, None, None self._check_type_specific_specs(dev_spec, baseMVA) self.p = None self.q = None self.soc = None # only for storage units
def _check_type_specific_specs(self, dev_spec, baseMVA): pass
[docs] class Load(Device): """ A passive load connected to an electric power grid. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): # docstring inherited super().__init__(dev_spec, bus_ids, baseMVA)
def _check_type_specific_specs(self, dev_spec, baseMVA): if self.type != -1: raise LoadSpecError("Trying to create a Load object for a device" " that has DEV_TYPE != -1.") self.qp_ratio = dev_spec[DEV_H["Q/P"]] if self.qp_ratio is None: raise LoadSpecError("A fixed Q/P value must be provided for device" " (load) %d." % self.dev_id) self.p_max = dev_spec[DEV_H["PMAX"]] if self.p_max is None: self.p_max = 0.0 elif self.p_max > 0.0: raise LoadSpecError( "Trying to create a load with P_max > 0. Loads" " can only withdraw power from the grid." ) self.p_min = dev_spec[DEV_H["PMIN"]] if self.p_min is None: self.p_min = -np.inf logger.warning("The P_min value of device %d is set to - infinity." % self.dev_id) else: self.p_min /= baseMVA # to p.u. if self.p_max < self.p_min: raise LoadSpecError("Device %d has P_max < P_min." % self.dev_id) self.q_max = self.p_max * self.qp_ratio self.q_min = self.p_min * self.qp_ratio
[docs] def map_pq(self, p): """ Map p to the closest (P, Q) feasible power injection point. Parameters ---------- p : float The desired :math:`P_l` power injection point (p.u.). """ self.p = np.clip(p, self.p_min, self.p_max) self.q = self.p * self.qp_ratio
[docs] class Generator(Device): """ A generator connected to an electrical power grid. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): # docstring inherited super().__init__(dev_spec, bus_ids, baseMVA) self.p_pot = self.p_max
@property def p_pot(self): return self._p_pot @p_pot.setter def p_pot(self, value): self._p_pot = np.clip(value, self.p_min, self.p_max) def _check_type_specific_specs(self, dev_spec, baseMVA): self.p_max = dev_spec[DEV_H["PMAX"]] if self.p_max is None: self.p_max = np.inf logger.warning("The P_max value of device %d is set to infinity." % self.dev_id) elif self.p_max < 0 and not self.is_slack: raise GenSpecError("The PMAX value of device %d should be >= 0." % self.dev_id) else: self.p_max /= baseMVA # to p.u. self.p_min = dev_spec[DEV_H["PMIN"]] if self.p_min is None: if self.is_slack: self.p_min = -np.inf else: self.p_min = 0.0 elif self.p_min < 0 and not self.is_slack: raise GenSpecError("The PMIN value of device %d should be >= 0." % self.dev_id) else: self.p_min /= baseMVA # to p.u. if self.p_max < self.p_min: raise GenSpecError("Device %d has PMAX < PMIN." % self.dev_id) self.q_max = dev_spec[DEV_H["QMAX"]] if self.q_max is None: self.q_max = np.inf if not self.is_slack: logger.warning("The Q_max value of device %d is set to " "infinity." % self.dev_id) else: self.q_max /= baseMVA # to p.u. self.q_min = dev_spec[DEV_H["QMIN"]] if self.q_min is None: self.q_min = -np.inf if not self.is_slack: logger.warning("The Q_min value of device %d is set to - " "infinity." % self.dev_id) else: self.q_min /= baseMVA # to p.u. if self.q_max < self.q_min: raise GenSpecError("Device %d has QMAX < QMIN." % self.dev_id) self.p_plus = dev_spec[DEV_H["P+"]] if self.p_plus is None: self.p_plus = self.p_max else: self.p_plus /= baseMVA if self.p_plus < self.p_min: raise GenSpecError("Device %d has P+ < PMIN" % self.dev_id) elif self.p_plus > self.p_max: raise GenSpecError("Device %d has P+ > PMAX." % self.dev_id) self.p_minus = dev_spec[DEV_H["P-"]] if self.p_minus is not None: logger.warning("The P- value of device %d is going to be ignored." % self.dev_id) self.p_minus = None self.q_plus = dev_spec[DEV_H["Q+"]] if self.q_plus is None: self.q_plus = self.q_max else: self.q_plus /= baseMVA if self.q_plus > self.q_max: raise GenSpecError("Device %d has Q+ > QMAX." % self.dev_id) self.q_minus = dev_spec[DEV_H["Q-"]] if self.q_minus is None: self.q_minus = self.q_min else: self.q_minus /= baseMVA if self.q_minus < self.q_min: raise GenSpecError("Device %d has Q- < QMIN." % self.dev_id) if self.q_plus < self.q_minus: raise GenSpecError("Device %d has Q+ < Q-." % self.dev_id) if self.p_max == self.p_plus: self.tau_1 = 0.0 self.tau_2 = 0.0 else: self.tau_1 = (self.q_plus - self.q_max) / (self.p_max - self.p_plus) self.tau_2 = (self.q_minus - self.q_min) / (self.p_max - self.p_plus) self.rho_1 = self.q_max - self.tau_1 * self.p_plus self.rho_2 = self.q_min - self.tau_2 * self.p_plus
[docs] def map_pq(self, p, q): """ Map (p, q) to the closest (P, Q) feasible power injection point. Parameters ---------- p, q : float The desired (P, Q) power injection point (p.u.). """ # Desired set-point. point = np.array([p, q]) # Inequality constraints for the optimization problem. G = np.array([[-1, 0], [1, 0], [1, 0], [0, -1], [0, 1], [-self.tau_1, 1], [self.tau_2, -1]]) h = np.array([-self.p_min, self.p_max, self.p_pot, -self.q_min, self.q_max, self.rho_1, -self.rho_2]) # Define and solve the CVXPY problem. x = cp.Variable(2) prob = cp.Problem(cp.Minimize(cp.sum_squares(x - point)), [G @ x <= h]) prob.solve() self.p = x.value[0] self.q = x.value[1]
[docs] class ClassicalGen(Generator): """ A non-renewable energy source connected to an electrical power grid. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): # docstring inherited super().__init__(dev_spec, bus_ids, baseMVA)
[docs] class RenewableGen(Generator): """ A renewable energy source connected to an electrical power grid. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): # docstring inherited super().__init__(dev_spec, bus_ids, baseMVA)
[docs] class StorageUnit(Device): """ A distributed storage unit connected to an electrical power grid. """
[docs] def __init__(self, dev_spec, bus_ids, baseMVA): super().__init__(dev_spec, bus_ids, baseMVA)
def _check_type_specific_specs(self, dev_spec, baseMVA): self.p_max = dev_spec[DEV_H["PMAX"]] if self.p_max is None: logger.warning("The P_max value of device %d is set to infinity." % (self.dev_id)) self.p_max = np.inf elif self.p_max < 0: raise StorageSpecError("The PMAX value of device %d should be >= 0." % self.dev_id) else: self.p_max /= baseMVA # to p.u. self.p_min = dev_spec[DEV_H["PMIN"]] if self.p_min is None: self.p_min = -np.inf logger.warning("The P_min value of device %d is set to - infinity." % (self.dev_id)) elif self.p_min > 0: raise StorageSpecError("The PMIN value of device %d should be <= 0." % self.dev_id) else: self.p_min /= baseMVA # to p.u. if self.p_max < self.p_min: raise StorageSpecError("Device %d has PMAX < PMIN." % self.dev_id) self.q_max = dev_spec[DEV_H["QMAX"]] if self.q_max is None: logger.warning("The Q_max value of device %d is set to infinity." % self.dev_id) self.q_max = np.inf else: self.q_max /= baseMVA # to p.u. self.q_min = dev_spec[DEV_H["QMIN"]] if self.q_min is None: self.q_min = -np.inf logger.warning("The Q_min value of device %d is set to - infinity." % self.dev_id) else: self.q_min /= baseMVA # to p.u. if self.q_max < self.q_min: raise StorageSpecError("Device %d has QMAX < QMIN." % self.dev_id) self.p_plus = dev_spec[DEV_H["P+"]] if self.p_plus is None: self.p_plus = self.p_max else: self.p_plus /= baseMVA if self.p_plus < self.p_min: raise StorageSpecError("Device %d has P+ < PMIN" % self.dev_id) elif self.p_plus > self.p_max: raise StorageSpecError("Device %d has P+ > PMAX." % self.dev_id) self.p_minus = dev_spec[DEV_H["P-"]] if self.p_minus is None: self.p_minus = self.p_min else: self.p_minus /= baseMVA if self.p_minus < self.p_min: raise StorageSpecError("Device %d has P- < PMIN" % self.dev_id) elif self.p_minus > self.p_max: raise StorageSpecError("Device %d has P- > PMAX" % self.dev_id) if self.p_plus < self.p_minus: raise StorageSpecError("Device %d has P+ < P-." % self.dev_id) self.q_plus = dev_spec[DEV_H["Q+"]] if self.q_plus is None: self.q_plus = self.q_max else: self.q_plus /= baseMVA if self.q_plus > self.q_max: raise StorageSpecError("Device %d has Q+ > QMAX." % self.dev_id) self.q_minus = dev_spec[DEV_H["Q-"]] if self.q_minus is None: self.q_minus = self.q_min else: self.q_minus /= baseMVA if self.q_minus < self.q_min: raise StorageSpecError("Device %d has Q- < QMIN." % self.dev_id) if self.q_plus < self.q_minus: raise StorageSpecError("Device %d has Q+ < Q-." % self.dev_id) if self.p_max == self.p_plus: self.tau_1 = 0.0 self.tau_2 = 0.0 else: self.tau_1 = (self.q_plus - self.q_max) / (self.p_max - self.p_plus) self.tau_2 = (self.q_minus - self.q_min) / (self.p_max - self.p_plus) self.rho_1 = self.q_max - self.tau_1 * self.p_plus self.rho_2 = self.q_min - self.tau_2 * self.p_plus if self.p_min == self.p_minus: self.tau_3 = 0.0 self.tau_4 = 0.0 else: self.tau_3 = (self.q_min - self.q_minus) / (self.p_minus - self.p_min) self.tau_4 = (self.q_max - self.q_plus) / (self.p_minus - self.p_min) self.rho_3 = self.q_min - self.tau_3 * self.p_minus self.rho_4 = self.q_max - self.tau_4 * self.p_minus self.soc_min = dev_spec[DEV_H["SOC_MIN"]] if self.soc_min is None: self.soc_min = 0.0 elif self.soc_min < 0.0: raise StorageSpecError("Device %d is a storage unit and has SOC_MIN" " < 0." % self.dev_id) else: self.soc_min /= baseMVA # to p.u. self.soc_max = dev_spec[DEV_H["SOC_MAX"]] if self.soc_max is None: raise StorageSpecError("Device %d is a storage unit and has SOC_MAX" " = None." % self.dev_id) if self.soc_max < 0.0: raise StorageSpecError("Device %d is a storage unit and has SOC_MAX" " < 0." % self.dev_id) else: self.soc_max /= baseMVA # to p.u. if self.soc_max < self.soc_min: raise StorageSpecError("Device %d is a storage unit and has SOC_MAX" " < SOC_MIN." % self.dev_id) self.eff = dev_spec[DEV_H["EFF"]] if self.eff is None: logger.warning("Setting (dis)charging efficiency of device %d " "(storage unit) to 1." % self.dev_id) self.eff = 1.0 elif self.eff < 0.0 or self.eff > 1.0: raise StorageSpecError("Device %d is a storage unit and has EFF " "outside of [0, 1]." % self.dev_id) return
[docs] def map_pq(self, p, q, delta_t): """ Map (p, q) to the closest (P, Q) feasible power injection point. Parameters ---------- p, q : float The desired (P, Q) power injection point (p.u.). """ # Desired set-point. point = np.array([p, q]) # Inequality constraints. G = np.array( [ [-1, 0], [1, 0], [0, -1], [0, 1], [-self.tau_1, 1], [self.tau_2, -1], [self.tau_3, -1], [-self.tau_4, 1], [-1, 0], [1, 0], ] ) h = np.array( [ -self.p_min, self.p_max, -self.q_min, self.q_max, self.rho_1, -self.rho_2, -self.rho_3, self.rho_4, -(self.soc - self.soc_max) / (delta_t * self.eff), self.eff * (self.soc - self.soc_min) / delta_t, ] ) # Define and solve the CVXPY problem. x = cp.Variable(2) prob = cp.Problem(cp.Minimize(cp.sum_squares(x - point)), [G @ x <= h]) prob.solve() self.p = x.value[0] self.q = x.value[1]
[docs] def update_soc(self, delta_t): """ Update the state of charge based on the current power injection. This function updates the new state of charge :math:`SoC_{t+1}`, assuming that the device injects :code:`self.p` active power (in p.u.) into the network during :code:`delta_t` hours. Parameters ---------- delta_t : float Time interval between subsequent timesteps :math:`\\Delta t` (in fraction of hour). """ # Compute the new SoC. if self.p <= 0: self.soc -= delta_t * self.eff * self.p else: self.soc -= delta_t * self.p / self.eff # Clip the new state of charge to be in [soc_min, soc_max]. self.soc = np.clip(self.soc, self.soc_min, self.soc_max)