rktl_autonomy

This package manipulates a ROS network so that it provides an enviornment to train an AI. In short, it uses the sim time mechanism used to replay bag files to mess with how time progresses, so that an arbitrary ROS network can be used as an environment, specifically, an OpenAI Gym environment.

This can be used for both training and evaluation purposes. When training, it manipulates time so that the network gets what it needs. It evaluation, it runs the network as fast as real time is progressing.

License:

BSD 3-Clause License Copyright (c) 2023, Autonomous Robotics Club of Purdue (Purdue ARC) All rights reserved.

Submodules

Package Contents

Classes

ROSInterface

Extension of the Gym environment class for all specific interfaces

CartpoleInterface

ROS interface for the cartpole game.

CartpoleDirectInterface

ROS interface for the cartpole game.

SnakeInterface

ROS interface for the snake game.

EnvCounter

class rktl_autonomy.ROSInterface(node_name='gym_interface', eval=False, launch_file=None, launch_args=[], run_id=None)

Bases: gym.Env

Extension of the Gym environment class for all specific interfaces to extend. This class handles logic regarding timesteps in ROS, and allows users to treat any ROS system as a Gym environment once the interface is created.

All classes extending this for a particular environment must do the following:
  • implement all abstract properties:
    • action_space

    • observation_space

  • implement all abstract methods:
    • _reset_env()

    • _reset_self()

    • _has_state()

    • _clear_state()

    • _get_state()

    • _publish_action()

  • notify _cond when _has_state() may have turned true

abstract property action_space

The Space object corresponding to valid actions.

abstract property observation_space

The Space object corresponding to valid observations.

step(action)

Implementation of gym.Env.step. This function will intentionally block if the ROS environment is not ready.

Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state. Accepts an action and returns a tuple (observation, reward, done, info). Args:

action (object): an action provided by the agent

Returns:

observation (object): agent’s observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)

reset()

Resets the environment to an initial state and returns an initial observation.

Note that this function should not reset the environment’s random number generator(s); random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes. Returns:

observation (object): the initial observation.

__step_time_and_wait_for_state(max_retries=1)

Step time until a state is known.

__wait_once_for_state()

Wait and allow other threads to run.

abstract _reset_env()

Reset environment for a new episode.

abstract _reset_self()

Reset internally for a new episode.

abstract _has_state()

Determine if the new state is ready.

abstract _clear_state()

Clear state variables / flags in preparation for new ones.

abstract _get_state()

Get state tuple (observation, reward, done, info).

abstract _publish_action(action)

Publish an action to the ROS network.

class rktl_autonomy.CartpoleInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None)

Bases: rktl_autonomy.ROSInterface

ROS interface for the cartpole game.

property action_space

The Space object corresponding to valid actions.

property observation_space

The Space object corresponding to valid observations.

_reset_env()

Reset environment for a new training episode.

_reset_self()

Reset internally for a new episode.

_has_state()

Determine if the new state is ready.

_clear_state()

Clear state variables / flags in preparation for new ones.

_get_state()

Get state tuple (observation, reward, done, info).

_publish_action(action)

Publish an action to the ROS network.

_obs_cb(obs_msg)

Callback for observation of game.

_reward_cb(reward_msg)

Callback for reward of game.

_done_cb(done_msg)

Callback for if episode is done.

class rktl_autonomy.CartpoleDirectInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None)

Bases: rktl_autonomy.ROSInterface

ROS interface for the cartpole game.

property action_space

The Space object corresponding to valid actions.

property observation_space

The Space object corresponding to valid observations.

_reset_env()

Reset environment for a new training episode.

_reset_self()

Reset internally for a new episode.

_has_state()

Determine if the new state is ready.

_clear_state()

Clear state variables / flags in preparation for new ones.

_get_state()

Get state tuple (observation, reward, done, info).

_publish_action(action)

Publish an action to the ROS network.

class rktl_autonomy.SnakeInterface(eval=False, launch_file=['rktl_autonomy', 'snake_train.launch'], launch_args=[], run_id=None)

Bases: rktl_autonomy.ROSInterface

ROS interface for the snake game.

property action_space

The Space object corresponding to valid actions.

property observation_space

The Space object corresponding to valid observations.

_reset_env()

Reset environment for a new training episode.

_reset_self()

Reset internally for a new episode.

_has_state()

Determine if the new state is ready.

_clear_state()

Clear state variables / flags in preparation for new ones.

_get_state()

Get state tuple (observation, reward, done, info).

_publish_action(action)

Publish an action to the ROS network.

_pose_cb(pose_msg)

Callback for poses of each segment of snake.

_goal_cb(goal_msg)

Callback for location of goal.

_score_cb(score_msg)

Callback for score of game.

_alive_cb(alive_msg)

Callback for active state of snake.

class rktl_autonomy.EnvCounter
count_env()
get_current_counter()