:py:mod:`rktl_autonomy._ros_interface` ====================================== .. py:module:: rktl_autonomy._ros_interface .. autoapi-nested-parse:: This handles the base behavior of manipulating time and interfacing with ROS. It gets subclassed to customize it to a specific ROS network's environment. License: BSD 3-Clause License Copyright (c) 2023, Autonomous Robotics Club of Purdue (Purdue ARC) All rights reserved. Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: rktl_autonomy._ros_interface.ROSInterface .. py:exception:: SimTimeException Bases: :py:obj:`Exception` For when advancing sim time does not go as planned. .. py:class:: ROSInterface(node_name='gym_interface', eval=False, launch_file=None, launch_args=[], run_id=None) Bases: :py:obj:`gym.Env` Extension of the Gym environment class for all specific interfaces to extend. This class handles logic regarding timesteps in ROS, and allows users to treat any ROS system as a Gym environment once the interface is created. All classes extending this for a particular environment must do the following: - implement all abstract properties: - action_space - observation_space - implement all abstract methods: - _reset_env() - _reset_self() - _has_state() - _clear_state() - _get_state() - _publish_action() - notify _cond when _has_state() may have turned true .. py:property:: action_space :abstractmethod: The Space object corresponding to valid actions. .. py:property:: observation_space :abstractmethod: The Space object corresponding to valid observations. .. py:method:: step(action) Implementation of gym.Env.step. This function will intentionally block if the ROS environment is not ready. Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling `reset()` to reset this environment's state. Accepts an action and returns a tuple (observation, reward, done, info). Args: action (object): an action provided by the agent Returns: observation (object): agent's observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning) .. py:method:: reset() Resets the environment to an initial state and returns an initial observation. Note that this function should not reset the environment's random number generator(s); random variables in the environment's state should be sampled independently between multiple calls to `reset()`. In other words, each call of `reset()` should yield an environment suitable for a new episode, independent of previous episodes. Returns: observation (object): the initial observation. .. py:method:: __step_time_and_wait_for_state(max_retries=1) Step time until a state is known. .. py:method:: __wait_once_for_state() Wait and allow other threads to run. .. py:method:: _reset_env() :abstractmethod: Reset environment for a new episode. .. py:method:: _reset_self() :abstractmethod: Reset internally for a new episode. .. py:method:: _has_state() :abstractmethod: Determine if the new state is ready. .. py:method:: _clear_state() :abstractmethod: Clear state variables / flags in preparation for new ones. .. py:method:: _get_state() :abstractmethod: Get state tuple (observation, reward, done, info). .. py:method:: _publish_action(action) :abstractmethod: Publish an action to the ROS network.