Source code for src.Environments.single_agent.BaseEnvironment

# region Imports and global variables
import os
from abc import ABC, abstractmethod
from typing import SupportsFloat, Any, Tuple

import numpy as np
from gymnasium.core import ActType, ObsType
from gymnasium.envs.mujoco import MujocoEnv
from gymnasium.spaces import Space, Box, Dict
from gymnasium.utils import EzPickle
from mujoco import MjModel, MjData
from mujoco._structs import _MjDataBodyViews
from numpy.random import default_rng

XML_PATH_MAIN = os.path.join(os.path.dirname(__file__), "../assets/UAV/scene.xml")
XML_PATH_REDUCED = os.path.join(os.path.dirname(__file__), "../assets/ReducedUAV/scene.xml")


# endregion

# region Drone and Target Classes
[docs] class Drone: """ Represents a drone in the simulation environment. It manages the drone's state, including its position, velocity, and IMU sensor readings. :param data: The MjData instance containing the simulation state. :param spawn_box: A numpy array defining the boundaries for the drone's initial position. :param spawn_max_velocity: The maximum initial velocity of the drone. :param rng: An instance of a random number generator (optional). """ def __init__(self, data: MjData, spawn_box: np.ndarray, spawn_max_velocity: float, rng: np.random.Generator = default_rng() ): """ Initializes the Drone instance with the provided parameters. :param data: The MjData instance from the simulation environment. :param spawn_box: A 2x3 numpy array defining the min and max coordinates for spawning the drone. :param spawn_max_velocity: The maximum magnitude of the drone's initial velocity vector. :param rng: A numpy random number generator instance for stochasticity in initial conditions. """ self.data = data self.body: _MjDataBodyViews = data.body('drone') self.spawn_box = spawn_box self.spawn_max_velocity = spawn_max_velocity self.rng = rng self.id = self.body.id @property def position(self) -> np.ndarray: """ Gets the drone's position. :return: A numpy array representing the drone's current position. """ return self.body.xpos @position.setter def position(self, value: np.ndarray) -> None: """ Sets the drone's position. :param value: A numpy array representing the new position for the drone. """ self.body.xpos = value @property def velocity(self) -> np.ndarray: """ Gets the drone's velocity. :return: A numpy array representing the drone's current velocity. """ return self.body.cvel[:3] @velocity.setter def velocity(self, value: np.ndarray) -> None: """ Sets the drone's velocity. :param value: A numpy array representing the new velocity for the drone. """ self.body.cvel[:3] = value @property def imu_accel(self) -> np.ndarray: """ Gets the drone's accelerometer readings from the IMU. :return: A numpy array representing the drone's current accelerometer readings. """ return self.data.sensor('imu_accel').data @property def imu_gyro(self) -> np.ndarray: """ Gets the drone's gyroscope readings from the IMU. :return: A numpy array representing the drone's current gyroscope readings. """ return self.data.sensor('imu_gyro').data @property def imu_orientation(self) -> np.ndarray: """ Gets the drone's orientation readings from the IMU. :return: A numpy array representing the drone's current orientation. """ return self.data.sensor('imu_orientation').data
[docs] def reset(self): """ Resets the drone's position and velocity to initial values within the defined spawn box and velocity limits. """ self.position = self.spawn_box[0] + (self.spawn_box[1] - self.spawn_box[0]) * self.rng.random(3) self.velocity = self.spawn_max_velocity * self.rng.random(3)
[docs] class Target: """ Represents a target in the simulation environment. It manages the target's state, including its position, velocity, and orientation. :param data: The MjData instance containing the simulation state. :param spawn_box: A numpy array defining the boundaries for the target's initial position. :param spawn_max_velocity: The maximum initial velocity of the target. :param spawn_max_angular_velocity: The maximum initial angular velocity of the target. :param rng: An instance of a random number generator (optional). """ def __init__(self, data: MjData, spawn_box: np.ndarray, spawn_max_velocity: float, spawn_max_angular_velocity: float, rng: np.random.Generator = default_rng() ): """ Initializes the Target instance with the provided parameters. :param data: The MjData instance from the simulation environment. :param spawn_box: A 2x3 numpy array defining the min and max coordinates for spawning the target. :param spawn_max_velocity: The maximum magnitude of the target's initial velocity vector. :param spawn_max_angular_velocity: The maximum magnitude of the target's initial angular velocity vector. :param rng: A numpy random number generator instance for stochasticity in initial conditions. """ self.data = data self.body: _MjDataBodyViews = data.body('target') self.spawn_box = spawn_box self.spawn_max_velocity = spawn_max_velocity self.spawn_max_angular_velocity = spawn_max_angular_velocity self.rng = rng self.id = self.body.id @property def position(self) -> np.ndarray: """ Gets the target's position. :return: A numpy array representing the target's current position. """ return self.body.xpos @position.setter def position(self, value: np.ndarray) -> None: """ Sets the target's position. :param value: A numpy array representing the new position for the target. """ self.body.xpos = value @property def velocity(self) -> np.ndarray: """ Gets the target's velocity. :return: A numpy array representing the target's current velocity. """ return self.body.cvel[:3] @velocity.setter def velocity(self, value: np.ndarray) -> None: """ Sets the target's velocity. :param value: A numpy array representing the new velocity for the target. """ self.body.cvel[:3] = value @property def orientation(self) -> np.ndarray: """ Gets the target's orientation. :return: A numpy array representing the target's current orientation. """ return self.body.qpos[3:7] @orientation.setter def orientation(self, value: np.ndarray) -> None: """ Sets the target's orientation. :param value: A numpy array representing the new orientation for the target. """ self.body.qpos[3:7] = value
[docs] def reset(self): """ Resets the target's position, velocity, and orientation to initial values within the defined spawn box and velocity limits. """ self.position = self.spawn_box[0] + (self.spawn_box[1] - self.spawn_box[0]) * self.rng.random(3) self.velocity = self.spawn_max_velocity * self.rng.random(3)
# endregion # region BaseMujocoEnv
[docs] class BaseRegime(MujocoEnv, EzPickle, ABC): """ Base class for creating training regimes in a Mujoco simulation environment. It sets up the environment, including the drone and target, and defines the necessary properties and methods that should be implemented by subclasses. This class should not be instantiated directly but extended by subclasses to define specific training regimes. :param kwargs: Keyword arguments for environment configuration. """ # region Initialization def __init__(self, **kwargs): """ Initialize the BaseRegime environment with configuration parameters. :keyword xml_path: Path to the XML model file. It can be a string indicating a predefined model ("Main" or "Reduced") or a path to a custom model file. (default: "Reduced") Type: str :keyword height: The height of the rendered images from the simulation, used in defining the observation space. Type: int, default: 480 :keyword width: The width of the rendered images from the simulation, used in defining the observation space. Type: int, default: 640 :keyword n_camera: Number of cameras/views to be included in the observation space. Each camera adds an image to the observation space. Type: int, default: 2 :keyword frame_skip: Number of frames to skip for each simulation step. A higher frame_skip results in faster simulation at the cost of less control precision. Type: int, default: 1 :keyword dt: The time step for the simulation. It determines the update frequency of the simulation. Type: float, default: value from the loaded Mujoco model or 0.01 if not specified :keyword drone_spawn_box: A 2x3 array specifying the lower and upper bounds ([min, max]) for the drone's initial position. Each row is a 3D coordinate. Type: numpy.ndarray, shape: (2, 3) :keyword drone_spawn_max_velocity: Maximum magnitude of the drone's initial velocity. Type: float :keyword target_spawn_box: A 2x3 array specifying the lower and upper bounds ([min, max]) for the target's initial position. Each row is a 3D coordinate. Type: numpy.ndarray, shape: (2, 3) :keyword target_spawn_max_velocity: Maximum magnitude of the target's initial velocity. Type: float :keyword target_spawn_max_angular_velocity: Maximum magnitude of the target's initial angular velocity. Type: float :keyword depth_camera: Whether to include a depth camera in the observation space. Type: bool, default: False """ # region Initialize MujocoEnv self.metadata = { "render_modes": [ "human", "rgb_array", "depth_array", ], "render_fps": 500, # render_fps will be set to dt^-1 after the model loads } height = kwargs.get('height', 480) width = kwargs.get('width', 640) self.n_camera = kwargs.get('n_camera', 2) self.depth_camera = kwargs.get('depth_camera', False) observation_space_dict = { "imu_accel": Box(low=-np.inf, high=np.inf, shape=(3,)), "imu_gyro": Box(low=-np.inf, high=np.inf, shape=(4,)), "imu_orientation": Box(low=-np.inf, high=np.inf, shape=(3,)), } for i in range(self.n_camera): observation_space_dict[f"image_{i}"] = ( Box(low=0, high=255, shape=(height, width, 3 if not self.depth_camera else 1))) observation_space: Space[ObsType] = Dict(observation_space_dict) # if model_path is a valid file, use it. If it is "Reduced", use the reduced model. Else, use the main model. model_path = kwargs.get('xml_path', 'Reduced') if model_path == "Reduced": model_path = XML_PATH_REDUCED elif model_path == "Main": model_path = XML_PATH_MAIN elif not os.path.isfile(model_path): raise ValueError(f"Model path {model_path} is not a valid file. " f"Expected a valid file or 'Reduced' or 'Main'.") MujocoEnv.__init__( self, model_path=model_path, frame_skip=kwargs.get('frame_skip', 1), observation_space=observation_space, height=height, width=width, ) EzPickle.__init__( self, XML_PATH_MAIN, kwargs.get('frame_skip', 1), kwargs.get('dt', self.dt), **kwargs, ) # Set action space self.action_space: Space = ( Box(low=-1.0, high=1.0, shape=(self.model.nu,))) # Set simulation timestep self.model.opt.timestep = kwargs.get('dt', self.dt) self.metadata = { "render_modes": [ "human", "rgb_array", "depth_array", ], "render_fps": int(np.round(1.0 / self.dt)), } # endregion # region Initialize Random Number Generator and Model/Data # Initialize random number generator self.rng = default_rng() # Initialize model and data attributes self.model: MjModel = self.model self.data: MjData = self.data # endregion # region Drone Parameters self.drone = Drone( data=self.data, spawn_box=kwargs.get('drone_spawn_box', np.array([np.array([-10, -10, 0.5]), np.array([10, 10, 0.5])])), spawn_max_velocity=kwargs.get('drone_spawn_max_velocity', 0.5), rng=self.rng ) # endregion # region Target Parameters self.target = Target( data=self.data, spawn_box=kwargs.get('target_spawn_box', np.array([np.array([-10, -10, 0.5]), np.array([10, 10, 0.5])])), spawn_max_velocity=kwargs.get('target_spawn_max_velocity', 0.5), spawn_max_angular_velocity=kwargs.get('target_spawn_max_angular_velocity', 0.5), rng=self.rng ) # endregion # endregion # region Properties @property def drone_target_vector(self) -> np.ndarray: """ Compute and return the vector from the drone to the target. :return: A numpy array representing the vector from the drone to the target. """ return self.target.position - self.drone.position @property def drone_hit_ground(self) -> bool: """ Check if the drone has hit the ground. :return: True if the drone has made contact with the ground, False otherwise. """ drone_id = self.model.geom('drone').id floor_id = self.model.geom('floor').id for i in range(self.data.ncon): contact = self.data.contact[i] if (contact.geom1 == drone_id and contact.geom2 == floor_id) or ( contact.geom1 == floor_id and contact.geom2 == drone_id): return True return False # endregion # region Methods
[docs] def pre_simulation(self) -> None: """ Perform any necessary actions before each simulation step. This method should be overridden by subclasses. """ pass
[docs] def reset_model(self) -> ObsType: """ Reset the environment to an initial state and return the initial observation. :return: The initial observation after resetting the environment. """ self.drone.reset() self.target.reset() return self.observation
# endregion # region Step Logic
[docs] def step( self, action: ActType ) -> Tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]: """ Execute one time step within the environment. :param action: The action to be executed. :return: A tuple containing the new observation, reward, done flag, truncated flag, and info dictionary. """ self.pre_simulation() self.do_simulation(action, self.frame_skip) return (self.observation, self.reward, self.truncated, self.done, self.metrics)
@property def observation(self) -> ObsType: """ Get the current observation of the environment. :return: The current environment observation. """ observation = { "acceleration": self.drone.imu_accel, "gyro": self.drone.imu_gyro, "orientation": self.drone.imu_orientation } for i in range(self.n_camera): observation[f"image_{i}"] = self.mujoco_renderer.render('rgb_array', i) return observation @property @abstractmethod def reward(self) -> SupportsFloat: """ Calculate and return the current reward. This method should be implemented by subclasses. :return: The current reward. """ raise NotImplementedError @property @abstractmethod def done(self) -> bool: """ Determine whether the episode is done. This method should be implemented by subclasses. :return: True if the episode is finished, False otherwise. """ raise NotImplementedError @property @abstractmethod def truncated(self) -> bool: """ Determine whether the episode is truncated. This method should be implemented by subclasses. :return: True if the episode is truncated, False otherwise. """ raise NotImplementedError @property @abstractmethod def metrics(self) -> dict[str, Any]: """ Return additional data or metrics for logging purposes. This method should be implemented by subclasses. :return: A dictionary containing metrics or additional information. """ raise NotImplementedError
# endregion # endregion class _TestRegime(BaseRegime): """ A test regime class for demonstration purposes, extending the BaseRegime class. This class implements the abstract methods of BaseRegime with basic or dummy functionality. """ def __init__(self, **kwargs): """ Initialize the _TestRegime environment. :param kwargs: Configuration parameters for the environment. """ super().__init__(**kwargs) def pre_simulation(self) -> None: """ Perform any necessary actions before each simulation step. Overridden to set control to zero. """ self.data.ctrl[:] = 0 @property def reward(self) -> SupportsFloat: """ Calculate and return the current reward. Overridden to return a fixed reward of 0. :return: The current reward, which is 0 in this test regime. """ return 0 @property def done(self) -> bool: """ Determine whether the episode is done. Overridden to always return False. :return: False, indicating the episode is never considered done in this test regime. """ return False @property def truncated(self) -> bool: """ Determine whether the episode is truncated. Overridden to always return False. :return: False, indicating the episode is never considered truncated in this test regime. """ return False @property def metrics(self) -> dict[str, Any]: """ Return additional data or metrics for logging purposes. Overridden to return an empty dictionary. :return: An empty dictionary, as no additional metrics are provided in this test regime. """ return {}