rktl_autonomy
This package manipulates a ROS network so that it provides an enviornment to train an AI. In short, it uses the sim time mechanism used to replay bag files to mess with how time progresses, so that an arbitrary ROS network can be used as an environment, specifically, an OpenAI Gym environment.
This can be used for both training and evaluation purposes. When training, it manipulates time so that the network gets what it needs. It evaluation, it runs the network as fast as real time is progressing.
- License:
BSD 3-Clause License Copyright (c) 2023, Autonomous Robotics Club of Purdue (Purdue ARC) All rights reserved.
Submodules
Package Contents
Classes
Extension of the Gym environment class for all specific interfaces |
|
ROS interface for the cartpole game. |
|
ROS interface for the cartpole game. |
|
ROS interface for the snake game. |
|
- class rktl_autonomy.ROSInterface(node_name='gym_interface', eval=False, launch_file=None, launch_args=[], run_id=None)
Bases:
gym.Env
Extension of the Gym environment class for all specific interfaces to extend. This class handles logic regarding timesteps in ROS, and allows users to treat any ROS system as a Gym environment once the interface is created.
- All classes extending this for a particular environment must do the following:
- implement all abstract properties:
action_space
observation_space
- implement all abstract methods:
_reset_env()
_reset_self()
_has_state()
_clear_state()
_get_state()
_publish_action()
notify _cond when _has_state() may have turned true
- abstract property action_space
The Space object corresponding to valid actions.
- abstract property observation_space
The Space object corresponding to valid observations.
- step(action)
Implementation of gym.Env.step. This function will intentionally block if the ROS environment is not ready.
Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state. Accepts an action and returns a tuple (observation, reward, done, info). Args:
action (object): an action provided by the agent
- Returns:
observation (object): agent’s observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
- reset()
Resets the environment to an initial state and returns an initial observation.
Note that this function should not reset the environment’s random number generator(s); random variables in the environment’s state should be sampled independently between multiple calls to reset(). In other words, each call of reset() should yield an environment suitable for a new episode, independent of previous episodes. Returns:
observation (object): the initial observation.
- __step_time_and_wait_for_state(max_retries=1)
Step time until a state is known.
- __wait_once_for_state()
Wait and allow other threads to run.
- abstract _reset_env()
Reset environment for a new episode.
- abstract _reset_self()
Reset internally for a new episode.
- abstract _has_state()
Determine if the new state is ready.
- abstract _clear_state()
Clear state variables / flags in preparation for new ones.
- abstract _get_state()
Get state tuple (observation, reward, done, info).
- abstract _publish_action(action)
Publish an action to the ROS network.
- class rktl_autonomy.CartpoleInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None)
Bases:
rktl_autonomy.ROSInterface
ROS interface for the cartpole game.
- property action_space
The Space object corresponding to valid actions.
- property observation_space
The Space object corresponding to valid observations.
- _reset_env()
Reset environment for a new training episode.
- _reset_self()
Reset internally for a new episode.
- _has_state()
Determine if the new state is ready.
- _clear_state()
Clear state variables / flags in preparation for new ones.
- _get_state()
Get state tuple (observation, reward, done, info).
- _publish_action(action)
Publish an action to the ROS network.
- _obs_cb(obs_msg)
Callback for observation of game.
- _reward_cb(reward_msg)
Callback for reward of game.
- _done_cb(done_msg)
Callback for if episode is done.
- class rktl_autonomy.CartpoleDirectInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None)
Bases:
rktl_autonomy.ROSInterface
ROS interface for the cartpole game.
- property action_space
The Space object corresponding to valid actions.
- property observation_space
The Space object corresponding to valid observations.
- _reset_env()
Reset environment for a new training episode.
- _reset_self()
Reset internally for a new episode.
- _has_state()
Determine if the new state is ready.
- _clear_state()
Clear state variables / flags in preparation for new ones.
- _get_state()
Get state tuple (observation, reward, done, info).
- _publish_action(action)
Publish an action to the ROS network.
- class rktl_autonomy.SnakeInterface(eval=False, launch_file=['rktl_autonomy', 'snake_train.launch'], launch_args=[], run_id=None)
Bases:
rktl_autonomy.ROSInterface
ROS interface for the snake game.
- property action_space
The Space object corresponding to valid actions.
- property observation_space
The Space object corresponding to valid observations.
- _reset_env()
Reset environment for a new training episode.
- _reset_self()
Reset internally for a new episode.
- _has_state()
Determine if the new state is ready.
- _clear_state()
Clear state variables / flags in preparation for new ones.
- _get_state()
Get state tuple (observation, reward, done, info).
- _publish_action(action)
Publish an action to the ROS network.
- _pose_cb(pose_msg)
Callback for poses of each segment of snake.
- _goal_cb(goal_msg)
Callback for location of goal.
- _score_cb(score_msg)
Callback for score of game.
- _alive_cb(alive_msg)
Callback for active state of snake.