rktl_autonomy._ros_interface

This handles the base behavior of manipulating time and interfacing with ROS. It gets subclassed to customize it to a specific ROS network’s environment.

License:

BSD 3-Clause License Copyright (c) 2023, Autonomous Robotics Club of Purdue (Purdue ARC) All rights reserved.

Module Contents

Classes

ROSInterface

Extension of the Gym environment class for all specific interfaces

exception rktl_autonomy._ros_interface.SimTimeException

Bases: Exception

For when advancing sim time does not go as planned.

class rktl_autonomy._ros_interface.ROSInterface(node_name='gym_interface', eval=False, launch_file=None, launch_args=[], run_id=None)

Bases: gym.Env

Extension of the Gym environment class for all specific interfaces to extend. This class handles logic regarding timesteps in ROS, and allows users to treat any ROS system as a Gym environment once the interface is created.

All classes extending this for a particular environment must do the following:
  • implement all abstract properties:
    • action_space

    • observation_space

  • implement all abstract methods:
    • _reset_env()

    • _reset_self()

    • _has_state()

    • _clear_state()

    • _get_state()

    • _publish_action()

  • notify _cond when _has_state() may have turned true

abstract property action_space

The Space object corresponding to valid actions.

abstract property observation_space

The Space object corresponding to valid observations.

step(action)

Implementation of gym.Env.step. This function will intentionally block if the ROS environment is not ready.

Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state. Accepts an action and returns a tuple (observation, reward, done, info). Args:

action (object): an action provided by the agent

Returns:

observation (object): agent’s observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)

reset()

Resets the environment to an initial state and returns an initial observation.

Note that this function should not reset the environment’s random number generator(s); random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes. Returns:

observation (object): the initial observation.

__step_time_and_wait_for_state(max_retries=1)

Step time until a state is known.

__wait_once_for_state()

Wait and allow other threads to run.

abstract _reset_env()

Reset environment for a new episode.

abstract _reset_self()

Reset internally for a new episode.

abstract _has_state()

Determine if the new state is ready.

abstract _clear_state()

Clear state variables / flags in preparation for new ones.

abstract _get_state()

Get state tuple (observation, reward, done, info).

abstract _publish_action(action)

Publish an action to the ROS network.