:py:mod:`rktl_autonomy` ======================= .. py:module:: rktl_autonomy .. autoapi-nested-parse:: This package manipulates a ROS network so that it provides an enviornment to train an AI. In short, it uses the sim time mechanism used to replay bag files to mess with how time progresses, so that an arbitrary ROS network can be used as an environment, specifically, an OpenAI Gym environment. This can be used for both training and evaluation purposes. When training, it manipulates time so that the network gets what it needs. It evaluation, it runs the network as fast as real time is progressing. License: BSD 3-Clause License Copyright (c) 2023, Autonomous Robotics Club of Purdue (Purdue ARC) All rights reserved. Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 _ros_interface/index.rst cartpole_direct_interface/index.rst cartpole_interface/index.rst env_counter/index.rst rocket_league_interface/index.rst snake_interface/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: rktl_autonomy.ROSInterface rktl_autonomy.CartpoleInterface rktl_autonomy.CartpoleDirectInterface rktl_autonomy.SnakeInterface rktl_autonomy.EnvCounter .. py:class:: ROSInterface(node_name='gym_interface', eval=False, launch_file=None, launch_args=[], run_id=None) Bases: :py:obj:`gym.Env` Extension of the Gym environment class for all specific interfaces to extend. This class handles logic regarding timesteps in ROS, and allows users to treat any ROS system as a Gym environment once the interface is created. All classes extending this for a particular environment must do the following: - implement all abstract properties: - action_space - observation_space - implement all abstract methods: - _reset_env() - _reset_self() - _has_state() - _clear_state() - _get_state() - _publish_action() - notify _cond when _has_state() may have turned true .. py:property:: action_space :abstractmethod: The Space object corresponding to valid actions. .. py:property:: observation_space :abstractmethod: The Space object corresponding to valid observations. .. py:method:: step(action) Implementation of gym.Env.step. This function will intentionally block if the ROS environment is not ready. Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling `reset()` to reset this environment's state. Accepts an action and returns a tuple (observation, reward, done, info). Args: action (object): an action provided by the agent Returns: observation (object): agent's observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning) .. py:method:: reset() Resets the environment to an initial state and returns an initial observation. Note that this function should not reset the environment's random number generator(s); random variables in the environment's state should be sampled independently between multiple calls to `reset()`. In other words, each call of `reset()` should yield an environment suitable for a new episode, independent of previous episodes. Returns: observation (object): the initial observation. .. py:method:: __step_time_and_wait_for_state(max_retries=1) Step time until a state is known. .. py:method:: __wait_once_for_state() Wait and allow other threads to run. .. py:method:: _reset_env() :abstractmethod: Reset environment for a new episode. .. py:method:: _reset_self() :abstractmethod: Reset internally for a new episode. .. py:method:: _has_state() :abstractmethod: Determine if the new state is ready. .. py:method:: _clear_state() :abstractmethod: Clear state variables / flags in preparation for new ones. .. py:method:: _get_state() :abstractmethod: Get state tuple (observation, reward, done, info). .. py:method:: _publish_action(action) :abstractmethod: Publish an action to the ROS network. .. py:class:: CartpoleInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None) Bases: :py:obj:`rktl_autonomy.ROSInterface` ROS interface for the cartpole game. .. py:property:: action_space The Space object corresponding to valid actions. .. py:property:: observation_space The Space object corresponding to valid observations. .. py:method:: _reset_env() Reset environment for a new training episode. .. py:method:: _reset_self() Reset internally for a new episode. .. py:method:: _has_state() Determine if the new state is ready. .. py:method:: _clear_state() Clear state variables / flags in preparation for new ones. .. py:method:: _get_state() Get state tuple (observation, reward, done, info). .. py:method:: _publish_action(action) Publish an action to the ROS network. .. py:method:: _obs_cb(obs_msg) Callback for observation of game. .. py:method:: _reward_cb(reward_msg) Callback for reward of game. .. py:method:: _done_cb(done_msg) Callback for if episode is done. .. py:class:: CartpoleDirectInterface(eval=False, launch_file=['rktl_autonomy', 'cartpole_train.launch'], launch_args=[], run_id=None) Bases: :py:obj:`rktl_autonomy.ROSInterface` ROS interface for the cartpole game. .. py:property:: action_space The Space object corresponding to valid actions. .. py:property:: observation_space The Space object corresponding to valid observations. .. py:method:: _reset_env() Reset environment for a new training episode. .. py:method:: _reset_self() Reset internally for a new episode. .. py:method:: _has_state() Determine if the new state is ready. .. py:method:: _clear_state() Clear state variables / flags in preparation for new ones. .. py:method:: _get_state() Get state tuple (observation, reward, done, info). .. py:method:: _publish_action(action) Publish an action to the ROS network. .. py:class:: SnakeInterface(eval=False, launch_file=['rktl_autonomy', 'snake_train.launch'], launch_args=[], run_id=None) Bases: :py:obj:`rktl_autonomy.ROSInterface` ROS interface for the snake game. .. py:property:: action_space The Space object corresponding to valid actions. .. py:property:: observation_space The Space object corresponding to valid observations. .. py:method:: _reset_env() Reset environment for a new training episode. .. py:method:: _reset_self() Reset internally for a new episode. .. py:method:: _has_state() Determine if the new state is ready. .. py:method:: _clear_state() Clear state variables / flags in preparation for new ones. .. py:method:: _get_state() Get state tuple (observation, reward, done, info). .. py:method:: _publish_action(action) Publish an action to the ROS network. .. py:method:: _pose_cb(pose_msg) Callback for poses of each segment of snake. .. py:method:: _goal_cb(goal_msg) Callback for location of goal. .. py:method:: _score_cb(score_msg) Callback for score of game. .. py:method:: _alive_cb(alive_msg) Callback for active state of snake. .. py:class:: EnvCounter .. py:method:: count_env() .. py:method:: get_current_counter()