執筆者:野口 裕貴
※ 「Isaac Gym入門」連載記事一覧はこちら
1. はじめに
本記事ではIsaac Gym入門の実践編として、Isaac Gymの強化学習のフレームワークについて解説し、独自の学習環境を作成する方法を紹介します。
2. 実行環境
- Ubuntu 20.04
- GeForce RTX 4080 SUPER (VRAM 16GB)
- NVIDIA driver version 550.120
- Isaac Gym Preview 4
- Isaac Gym Env Release 1.5.1
今回はDockerによって構築した環境を使用しました。Dockerでの環境構築の方法に関しては、Isaac Gym入門編の活用編の付録で紹介しています。
【補足】GPUとPyTorchバージョンの互換性について
本記事の検証中、使用するGPU(特に比較的新しい世代のGPU)によっては、以下のようなエラーが発生する場合があることが判明しました。
RuntimeError: The following operation failed in the TorchScript interpreter. Traceback of TorchScript (most recent call last): RuntimeError: nvrtc: error: invalid value for --gpu-architecture (-arch)
このエラーはCUDAに対応したPyTorchバージョンになっていないことが原因です。
この問題を解決するには、Dockerを使用する場合、環境を構築する際に、Dockerfileのベースイメージ指定(FROM 行)を、より新しい、お使いのGPUに対応したPyTorchバージョンが含まれるイメージに変更する必要があります。例えば、以下のように変更します。
# 元の指定 (コメントアウト) # FROM nvcr.io/nvidia/pytorch:21.09-py3 # より新しいバージョンに変更 FROM nvcr.io/nvidia/pytorch:22.12-py3
以前の記事「Isaac Gym入門編:活用編」で紹介した環境構築手順(執筆時点のベースイメージ)では、特定のGPUでこの問題は発生しないかもしれませんが、お使いの環境(特にGPUの種類)によっては上記のような修正が必要になる場合があります。エラーが発生した場合は、Dockerfileのベースイメージを確認し、適宜適切なバージョンに変更してください。
3. Isaac Gymの強化学習フレームワーク
Isaac GymはGitHubで、フレームワークを解説するドキュメントを公開しています。ドキュメントは以下のURLで確認できます。
本章では、上記ドキュメントを参考に解説します。
上記ドキュメントにも記載されている通り、Isaac Gymで独自の学習環境を作成するには、以下の手順が必要です。
- 親クラス
VecTask
を継承し、新たな子クラスを作成 VecTask
の子クラスとして必要なメソッドを実装- 学習およびシミュレーション環境のパラメータを設定するYAMLファイルを作成
- isaacgymenvs/tasks/_init_.pyに実装した新しい学習環境の名称を追加
3.1 VecTaskを継承
Isaac Gymの学習用フレームワークを使用するには、親クラスであるVecTask
を継承する必要があります。このクラスはvec_task.pyで実装されています。
この親クラスを用いて学習環境を作成します。
最初のステップとして、/isaacgymens/tasks
ディレクトリに新しいファイルMyNewTask.py
を作成します。作成したMyNewTask.py
ファイルでIsaac GymのAPIを使用するために以下をインポートします。
from isaacgym import gymtorch from isaacgym import gymapi # VecTaskクラスをインポート from .base.vec_task import VecTask
次に親クラスであるVecTask
を継承します。
class MyNewTask(VecTask):
新たな子クラスMyNewTask
のコンストラクタ(__init__
メソッド)では、以下の3つの引数を受け取る必要があります。
config
多数の設定パラメータを格納する辞書オブジェクトsim_device
物理シミュレーションに使用するデバイスを指定します(文字列型、例:cuda:0
、cpu
)headless
ヘッドレスモード(描画なし)で実行するかどうかを指定します(ブール型、True
またはFalse
)
※ sim_device
、headless
のデフォルトの値は(前回の活用編の記事で解説したように)それぞれsim_device=cuda:0
(GPU)、headless=False
です。
config
という辞書オブジェクトは以下のようなパラメータを格納しています。
device_type
シミュレーションの物理演算に使用するデバイス(cuda
またはcpu
)device_id
シミュレーションに使用するデバイスのID。シングルGPU環境では通常 0rl_device
強化学習の計算に使用するデバイスを指定する。デフォルトはcuda:0
(GPU)physics_engine
使用する物理エンジン(physx
またはflex
)env
環境固有のパラメータを含む辞書オブジェクト。特定のタスクに応じて任意のパラメータを追加できます。主要なパラメータは以下の通りnumEnvs
並列実行する環境の数numObservations
各環境で使用される観測空間の次元数numActions
行動空間の次元数
その他にも以下のようなパラメータも利用できる
numAgents
マルチエージェント環境*1で使用する。デフォルトは1numStates
非対称アクター・クリティック学習*2で使用する状態空間(または観測空間)の次元数controlFrequencyInv
エージェントが1回行動する間にシミュレーションが進むステップ数を指定する。デフォルトは 1clipObservationsinf
観測値をクリップする範囲(上限・下限)デフォルトは inf(クリップなし)clipActions1
行動値をクリップする範囲(上限・下限)です。デフォルトは 1enableCameraSensors
学習環境でカメラセンサーを使用する場合にTrue
に設定する
※ MyNewTask
のコンストラクタから親クラス(VecTask
)のコンストラクタを呼び出す際には、このconfig
を必ず引数として渡してください。
3.2 メソッドの実装
3.1で作成したMyNewTask
クラスを機能させるには、以下のメソッドを実装する必要があります。
create_sim
- 軸座標系の変更
- シミュレーション環境の設定
pre_physics_step
- 行動空間の設定と適用
post_physics_step
- 状態(観測)空間の設定と計算
- 報酬の設定と計算
3.3 設定ファイルの追加
isaacgymenvs/cfg
ディレクトリ配下に、以下の2種類の設定ファイルを作成する必要があります。
- 学習設定ファイル:学習率など、強化学習アルゴリズムに関するパラメータを設定
- 環境設定ファイル:シミュレーション環境(タスク)に関するパラメータを設定
これらのファイルはYAML形式で作成します。
配置場所はそれぞれ以下の通りです。
- 学習設定ファイル:
/isaacgymenvs/cfg/train
ディレクトリ - 環境設定ファイル:
/isaacgymenvs/cfg/task
ディレクトリ
例えば、Cartpole環境の場合、以下のようになります。
- isaacgymenvs/cfg/train/CartpolePPO.yaml:学習設定ファイル
- isaacgymenvs/cfg/task/Cartpole.yaml:環境設定ファイル
各ファイルに設定するパラメータの詳細は、4章で解説します。
また、重要な点として、作成する設定ファイル(特に環境設定ファイル)のベース名(拡張子を除いた部分)は、後述する手順でisaac_gym_task_map
に登録するタスク名と一致させる必要があります。
例)
- タスク名 Anymal → 環境設定ファイル Anymal.yaml
- タスク名 Cartpole → 環境設定ファイル Cartpole.yaml
3.4 学習環境の追加
3.1から3.3の手順で作成した学習環境は、最後にisaacgymenvs/tasks/_init_.pyファイルに登録する必要があります。
具体的には、まず作成したクラス(MyNewTask
)をisaacgymenvs/tasks/_init_.py内でインポートします。次に、isaacgym_task_map
という辞書型に、新しいタスク名(キー)とインポートしたクラスオブジェクト(値)のペアを追加します。
# 自作タスクのクラスをインポート from isaacgymenvs.tasks.my_new_task import MyNewTask # 他のタスクのインポート文 ... isaac_gym_task_map = { 'Anymal': Anymal, # ... 他の既存タスク ... 'MyNewTask': MyNewTask, }
この登録により、これまでのIsaac Gym入門記事で紹介したように、train.py を用いて作成したタスクの学習を実行できるようになります。
python3 train.py task=MyNewTask
4. 学習環境の解説(Cartpole)
3章では、Isaac Gymで独自の学習環境を作成するための基本的な手順について解説しました。
しかし、それだけでは実際にどのようにコードを実装すればよいか、具体的にイメージしにくいかもしれません。そこで本章では、Isaac Gymに標準で実装されているCartpole環境を例に取り上げ、その実装内容を詳しく見ていきます。
3章で説明した、学習環境を構成する以下の各ファイルについて解説します。
/isaacgymenvs/tasks/cartpole.py
(タスク定義ファイル)/isaacgymenvs/cfg/task/Cartpole.yaml
(環境設定ファイル)/isaacgymenvs/cfg/train/CartpolPPO.yaml
(学習設定ファイル)
4.1 Cartpoleタスクの概要
Cartpoleのコード解説に先立ち、このタスクにおける強化学習の概要を説明します。これを事前に把握しておくことで、コードの理解が深まると思います。
達成目標
台車(カート)を左右に動かし、その上に取り付けられた棒(ポール)をできるだけ長く立たせ続けること。
強化学習のエージェント
台車とポールのシステム全体です。 自由度(DOF)は2つあり、カートの水平移動(並進自由度)とポールの回転(回転自由度)です。
状態空間
以下の4つの連続値で構成されます。
- カートの水平位置
- カートの水平速度
- ポールの垂直からの角度
- ポールの角速度
行動空間
カートに加える水平方向の力を表す、単一の連続値です。
報酬関数
エージェントが「ポールをできるだけ長く垂直に保ち、かつカートをトラックの中心付近に維持する」ことを学習するように、主に以下の要素で報酬が設計されています。
- 生存報酬
- エピソードが終了せずにステップを継続できた場合に与えられる、一定の正の報酬。
- ペナルティ
- ポールの角度:垂直から離れるほど大きなペナルティ(負の報酬)。
- カートの位置:トラックの中心から離れるほど小さなペナルティ。
- カートの速度:速度に応じてペナルティが与えられる。
エピソードの終了条件
以下のいずれかの条件を満たすと、エピソードは終了し、環境がリセットされます。
- ポールの角度が一定の閾値を超える
- カートの位置が一定の閾値を超える
- エピソードの最大ステップ数に達する
以降で解説するコードは、ここで説明した内容を実装したものです。この概要を念頭に置いてコードを読むと、理解が深まるでしょう。それでは、具体的なコードの解説に移ります。
4.2 タスク定義ファイル tasks/cartpole.py
ソースコードは以下のリンク先で確認できます。
このファイル(cartpole.py
)には、Cartpoleタスクの強化学習における主要な処理が記述されています。具体的には、シミュレーション環境の初期化、観測空間と行動空間の定義、報酬計算などを担当します。
3章で説明した通り、このCartpoleクラスも親クラスであるVecTask
を継承して作成されています。
def_init_()
まず、クラスの初期化を行うコンストラクタdef __init__
メソッドから見ていきましょう
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render): def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render): # インスタンス変数に格納 self.cfg = cfg # cfgから環境固有のパラメータを取得 self.reset_dist = self.cfg["env"]["resetDist"] self.max_push_effort = self.cfg["env"]["maxEffort"] # エピソードの最大長を設定 self.max_episode_length = 500 # 観測空間と行動空間の次元数をcfgに設定 self.cfg["env"]["numObservations"] = 4 self.cfg["env"]["numActions"] = 1 # 親クラス(VecTask)のコンストラクタを呼び出し、基本的な初期化を実行 super().__init__(config=self.cfg, rl_device=rl_device, sim_device=sim_device, graphics_device_id=graphics_device_id, headless=headless, virtual_screen_capture=virtual_screen_capture, force_render=force_render) # シミュレーションからDOF状態テンソルを取得し、PyTorchテンソルにラップ dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim) self.dof_state = gymtorch.wrap_tensor(dof_state_tensor) # ラップしたテンソルから、位置(pos)と速度(vel)の情報を抽出 # ビュー操作で (num_envs, num_dof, 2) の形状に変形し、最後の次元で位置と速度を分離 self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0] self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
まず、引数で受け取った辞書オブジェクトcfg
をインスタンス変数self.cfg
に格納します。このcfg
には、対応するYAMLファイル(cfg/task/Cartpole.yaml
、cfg/train/CartpolePPO.yaml
)で定義されたパラメータが含まれています。次に、cfg
からリセット距離(resetDist
)、カートを押す最大力(maxEffort
)を取得し、インスタンス変数に設定します。また、エピソードの最大長(max_episode_length
)も定義します。さらに、cfg["env"]
に、このタスクにおける観測空間の次元数(numObservations
)と行動空間の次元数(numActions
)を設定します。これらの値は、4.1節で説明した通り、Cartpoleタスクではそれぞれ4(カート位置、カート速度、ポール角度、ポール角速度)と1(カートへの力)になります。
必要な設定をself.cfg
に格納(または設定)した後、super().__init__(...)
を呼び出して親クラスVecTask
のコンストラクタを実行します。これにより、シミュレーションのセットアップなど、基本的な初期化処理が行われます。
最後の4行では、シミュレーションから全環境のDOFの状態(位置と速度)を取得し、PyTorchテンソルとして効率的に扱えるように準備しています。この処理は以下のステップで行われます。
self.gym.acquire_dof_state_tensor(self.sim)
を使用して、シミュレーション内の全環境における全てのDOFの状態(位置と速度が交互に格納されている)を含むテンソルを取得- 取得したテンソルを
gymtorch.wrap_tensor
でPyTorchテンソルにラップ(変換)し、GPU上での計算を可能にします - ラップしたテンソル(
self.dof_state
)をビュー操作で(num_envs, num_dof, 2
)という形状に変形し、最後の次元([..., 0] と [..., 1])を使って、全環境のDOFの位置情報(self.dof_pos
)と速度情報(self.dof_vel
)を含むテンソルとして分離・抽出します
create_sim()
次に、シミュレーション環境のセットアップを行うcreate_sim
メソッドを解説します。
def create_sim(self): # cfgファイルに基づき、シミュレーションの上方向軸を設定 (z-up or y-up) self.up_axis = self.cfg["sim"]["up_axis"] # 親クラス(VecTask)のcreate_simを呼び出し、基本的なシミュレーション環境を生成 self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params) # 地面を作成・配置 self._create_ground_plane() # 指定された数の環境(envs)を作成し、各環境にアクター(Cartpoleモデル)を配置 self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
まず、設定ファイル(cfg
)から上方向を示す軸(up_axis
)を取得・設定します。次に、親クラス(VecTask
)のcreate_sim()
メソッドを呼び出し、オブジェクト(self.sim)を生成します。この際、物理エンジンやシミュレーションパラメータ、使用デバイスなどの基本的な設定が行われます。
続いて、_create_ground_plane()
で地面を生成・配置し、_create_envs()
で指定された数の並列環境を作成し、各環境内にCartpoleモデル(アクター)を配置します。
_create_envs()
_create_envs()
メソッドは、並列実行される各シミュレーション環境の作成、アセット(Cartpoleモデル)のロードと配置、そしてDOFの制御設定を行います。実装は以下のようになっています。
def _create_envs(self, num_envs, spacing, num_per_row): # (1) 各環境が配置される空間の境界(lower, upper)を定義 lower = gymapi.Vec3(0.5 * -spacing, -spacing, 0.0) if self.up_axis == 'z' else gymapi.Vec3(0.5 * -spacing, 0.0, -spacing) upper = gymapi.Vec3(0.5 * spacing, spacing, spacing) # (2) アセットファイル(URDF)のパスを決定 asset_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../assets") asset_file = "urdf/cartpole.urdf" # cfgファイルでアセットパスが指定されていれば上書き if "asset" in self.cfg["env"]: asset_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.cfg["env"]["asset"].get("assetRoot", asset_root)) asset_file = self.cfg["env"]["asset"].get("assetFileName", asset_file) asset_path = os.path.join(asset_root, asset_file) asset_root = os.path.dirname(asset_path) asset_file = os.path.basename(asset_path) # アセットオプションを設定 asset_options = gymapi.AssetOptions() asset_options.fix_base_link = True # (3) アセットをロード cartpole_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options) # (4) アセットのDOF数を取得 self.num_dof = self.gym.get_asset_dof_count(cartpole_asset) # (5)アクターの初期ポーズ(位置と姿勢)を設定 pose = gymapi.Transform() if self.up_axis == 'z': pose.p.z = 2.0 # z軸方向に2.0の位置 pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0) # 回転なし else: # y-upの場合 pose.p.y = 2.0 # y軸方向に2.0の位置 # x軸周りに-90度回転 (y-upモデルをz-up環境に合わせる場合など) pose.r = gymapi.Quat(-np.sqrt(2)/2, 0.0, 0.0, np.sqrt(2)/2) # (6)各環境とアクターハンドルを格納するリストを初期化 self.cartpole_handles = [] self.envs = [] # 指定された数の環境を作成 (ループ処理) for i in range(self.num_envs): # 環境インスタンスを作成 env_ptr = self.gym.create_env(self.sim, lower, upper, num_per_row) # 作成した環境にアクター(Cartpole)を配置し、ハンドルを取得 cartpole_handle = self.gym.create_actor(env_ptr, cartpole_asset, pose, "cartpole", i, 1, 0) # collision group=1, mask=0 # アクターのDOFプロパティを設定 dof_props = self.gym.get_actor_dof_properties(env_ptr, cartpole_handle) # カートの関節(DOF 0)は力(Effort)で制御 dof_props['driveMode'][0] = gymapi.DOF_MODE_EFFORT # ポールの関節(DOF 1)は制御しない(受動的) dof_props['driveMode'][1] = gymapi.DOF_MODE_NONE # Stiffness(剛性)とDamping(減衰)は0に設定 (力制御のため) dof_props['stiffness'][:] = 0.0 dof_props['damping'][:] = 0.0 # 変更したプロパティを適用 self.gym.set_actor_dof_properties(env_ptr, cartpole_handle, dof_props) # 作成した環境ポインタとアクターハンドルをリストに保存 self.envs.append(env_ptr) self.cartpole_handles.append(cartpole_handle)
このメソッド内の主な処理は以下の通りです。
環境配置領域の定義
gymapi.Vec3
を使用して、各環境がグリッド状に配置される際の境界(最小座標lower
と最大座標upper
)を定義します。アセットパスの決定
CartpoleモデルのURDFファイル(cartpole.urdf)のパスを特定します。設定ファイル(cfg)で指定があれば、そちらを優先します。アセットのロード
self.gym.load_asset
を使用して、指定されたURDFファイルをシミュレーションにロードします。Asset_Options
でアセットの読み込み方(例:ベースリンクの固定有無)を設定できます。DOF数の取得
ロードしたアセットのDOF数を取得し、self.num_dof
に格納します。初期ポーズの設定
各環境に配置するCartpoleアクターの初期位置(pose.p
)と初期姿勢(pose.r
)をgymapi.Transform
で定義します。up_axis
の設定に応じて調整されます。環境作成とアクター配置 (ループ処理)
self.num_envs
で指定された回数だけループを実行し、各反復で以下の処理を行います。- 環境インスタンスの作成
self.gym.create_env
で新しい空の環境を作成します。 - アクターの作成と配置
self.gym.create_actor
で、ロードしたcartpole_asset
を作成した環境(env_ptr
)内の指定した初期ポーズ(pose
)に配置します。戻り値としてアクターハンドル(cartpole_handle
)を取得します。 DOFプロパティの設定
self.gym.get_actor_dof_properties
でアクターのDOFプロパティを取得し、制御方法('driveMode')や物理特性('stiffness'、'damping')を設定します。driveMode[0] = gymapi.DOF_MODE_EFFORT
カートの関節(インデックス0)を力制御モードに設定します。driveMode[1] = gymapi.DOF_MODE_NONE
ポールの関節(インデックス1)は直接制御しないモードに設定します。- 変更したプロパティは
self.gym.set_actor_dof_properties
でアクターに適用します。
環境ポインタとアクターハンドルの保存
後で参照できるように、作成した環境のポインタ(env_ptr
)とアクターのハンドル(cartpole_handle
)をそれぞれリスト(self.envs
、self.cartpole_handles
)に追加します
- 環境インスタンスの作成
pre_physics_step()
次に、物理シミュレーションが実行される直前に呼び出されるpre_physics_step()
メソッドを解説します。このメソッドは、強化学習エージェントから出力された行動を受け取り、それをシミュレーション内のアクター(Cartpole)に適用する役割を担います。
def pre_physics_step(self, actions): # 全環境・全DOFに対する力テンソルをゼロで初期化 # (num_envs * num_dof) の1次元テンソル actions_tensor = torch.zeros(self.num_envs * self.num_dof, device=self.device, dtype=torch.float) # actionsテンソルからカートのDOFに対応する部分に力を設定 # actions は通常 [-1, 1] の範囲なので、max_push_effort でスケールする # [::self.num_dof] は、DOF0 (カート) の要素のみを選択するスライス actions_tensor[::self.num_dof] = actions.to(self.device).squeeze(-1) * self.max_push_effort # squeeze(-1) は、actionsの形状が (num_envs, 1) の場合に (num_envs,) に変換するため # PyTorchテンソルをIsaac Gymが扱える形式に変換 forces = gymtorch.unwrap_tensor(actions_tensor) # 計算された力をシミュレーション内の全DOFに適用 self.gym.set_dof_actuation_force_tensor(self.sim, forces)
このメソッドでは、まずtorch.zeros
を使って、全環境(self.num_envs
)の全DOF(self.num_dof
)に対して適用する力を格納するためのテンソル(actions_tensor
)をゼロで初期化します。Cartpoleの場合、DOFは2つ(カートの並進、ポールの回転)なので、このテンソルのサイズは num_envs * 2
になります。
次に、エージェントから渡された行動を以下の手順で処理します。
actions.to(self.device)
で、テンソルをシミュレーションと同じデバイス(CPUまたはGPU)に移動.squeeze(-1)
で、もしactions
の形状が (num_envs, 1) のように余分な次元を持っている場合、それを取り除き (num_envs,) の形状に変換* self.max_push_effort
で、通常 [-1, 1] の範囲で出力される行動値を、設定された最大力(max_push_effort
)に基づいて物理的な力の大きさにスケーリングactions_tensor[::self.num_dof]
は、テンソルスライスを用いて、カートのDOFに対応する要素にのみ、計算された力の値を代入
続いて、gymtorch.unwrap_tensor
を使って、PyTorchテンソル(actions_tensor
)をIsaacGymのAPIが直接扱える内部形式(forces
)に変換します。
最後に、self.gym.set_dof_actuation_force_tensor
を呼び出し、計算された力のテンソル(forces
)をシミュレーション内の全アクターの全DOFに一括で適用します。これにより、次の物理シミュレーションステップで、各カートが指定された力で押されることになります。
post_physics_step()
次に、物理シミュレーションステップの実行後に呼び出されるpost_physics_step()
メソッドを解説します。このメソッドは、シミュレーション結果を処理し、次のステップに向けた準備を行います
def post_physics_step(self): # 各環境の現在のエピソードステップ数をインクリメント self.progress_buf += 1 # リセットが必要な環境のIDを取得 # self.reset_buf は compute_reward で更新される env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1) # リセットが必要な環境があれば、reset_idxを呼び出す if len(env_ids) > 0: self.reset_idx(env_ids) # 最新の観測データを計算・取得し、self.obs_buf を更新 self.compute_observations() # 報酬を計算し、self.rew_buf と self.reset_buf を更新 self.compute_reward()
このメソッドでは、まず各環境のエピソード内での経過ステップ数をカウントするself.progress_buf
をインクリメントします。
次に、self.reset_buf
(後述のcompute_reward
で更新される、リセットが必要かどうかを示すフラグ)を確認し、リセットが必要な環境(reset_buf
が1の環境)のIDリスト (env_ids
)を取得します。もしリセット対象の環境が存在すれば、self.reset_idx(env_ids)
を呼び出して、それらの環境を初期状態にリセットします。
reset_idx()
は、指定されたenv_ids
の環境に対して、カートとポールのDOF状態(位置・速度)をランダムな初期値に設定し、対応するself.dof_state
テンソルを更新します。最後に、リセットされた環境のself.reset_buf
とself.progress_buf
をゼロにクリアし、新しいエピソードを開始できるようにします。
その後、compute_observations()
を呼び出して最新の観測データを計算・取得し、self.obs_buf
を更新します。最後にcompute_reward()
を呼び出して、更新された状態に基づき報酬と次のリセットフラグを計算し、self.rew_buf
とself.reset_buf
を更新します。
compute_observations()
この関数は、シミュレーションから最新の物理状態(DOFの位置と速度)を取得し、それを整形して強化学習エージェントが利用できる観測データ (self.obs_buf
) に変換します。
def compute_observations(self, env_ids=None): # 更新対象の環境IDが指定されていなければ、全環境を対象とする if env_ids is None: env_ids = np.arange(self.num_envs) # シミュレーションから最新のDOF状態を取得し、self.dof_stateテンソルを更新 self.gym.refresh_dof_state_tensor(self.sim) # self.dof_state から必要な情報を抽出し、観測バッファ(self.obs_buf)に格納 # Cartpoleの観測: [カート位置, カート速度, ポール角度, ポール角速度] self.obs_buf[env_ids, 0] = self.dof_pos[env_ids, 0].squeeze() # カート位置 (DOF 0 pos) self.obs_buf[env_ids, 1] = self.dof_vel[env_ids, 0].squeeze() # カート速度 (DOF 0 vel) self.obs_buf[env_ids, 2] = self.dof_pos[env_ids, 1].squeeze() # ポール角度 (DOF 1 pos) self.obs_buf[env_ids, 3] = self.dof_vel[env_ids, 1].squeeze() # ポール角速度 (DOF 1 vel) # 更新された観測バッファを返す (RLアルゴリズムへの入力となる) return self.obs_buf
具体的な処理の流れは以下の通りです。
- 対象環境の決定
- 引数である
env_ids
が指定されていない場合、すべての環境を対象として以降の処理の対象になります
- 引数である
- シミュレーション状態の更新
self.gym.refresh_dof_state_tensor(self.sim)
を呼び出し、シミュレーションエンジンから最新の全DOFの状態(位置と速度)を取得し、クラス内の self.dof_state テンソル(およびそこからビューで作成されたself.dof_pos, self.dof_vel
)に反映させます。
- 観測バッファへのデータ格納
- 更新された
self.dof_pos
とself.dof_vel
から、Cartpoleタスクに必要な情報(カートの位置・速度、ポールの角度・角速度)を抽出し、観測バッファself.obs_buf
の対応するインデックス(env_ids
)に格納します。
- 更新された
- 観測バッファの返却
- 更新された
self.obs_buf
を返します。このテンソルが強化学習アルゴリズム(エージェントの方策)への入力となります
- 更新された
compute_reward()
この関数は、現在の環境状態(主にself.obs_buf
に格納された観測データ)に基づいて、各環境のエージェントに対する報酬を計算し、同時に次のステップでエピソードをリセットすべきかどうかを判断します。
def compute_reward(self): # 観測バッファ(self.obs_buf)から報酬計算に必要な状態量を取得 pole_angle = self.obs_buf[:, 2] pole_vel = self.obs_buf[:, 3] cart_vel = self.obs_buf[:, 1] cart_pos = self.obs_buf[:, 0] # 取得した状態量などを使い、報酬とリセットフラグを計算する関数を呼び出す # この関数は通常 @torch.jit.script で最適化されている self.rew_buf[:], self.reset_buf[:] = compute_cartpole_reward( pole_angle, pole_vel, cart_vel, cart_pos, self.reset_dist, self.reset_buf, self.progress_buf, self.max_episode_length )
具体的な処理の流れは以下の通りです。
テンソルから観測データを取得
- 直前の compute_observations()の呼び出しによって更新された
self.obs_buf
から報酬の計算に必要な特定の状態量を取り出しますpole_angle = self.obs_buf[:, 2]
:全環境のポールの角度を取得します。pole_vel = self.obs_buf[:, 3]
:全環境のポールの角速度を取得します。cart_vel = self.obs_buf[:, 1]
:全環境のカートの速度を取得します。cart_pos = self.obs_buf[:, 0]
:全環境のカートの位置を取得します。
- 直前の compute_observations()の呼び出しによって更新された
報酬の計算とリセット判定の実行
- 取得した状態量やリセット距離(
self.reset_dist
)、現在のリセット状態(self.reset_buf
)、ステップ数(self.progress_buf
)などを引数として、実際の計算を行う関数compute_cartpole_reward
を呼び出します。
- 取得した状態量やリセット距離(
結果を格納
compute_cartpole_reward
から返された報酬テンソルとリセットフラグテンソルを、それぞれクラスのメンバー変数であるself.rew_buf
とself.reset_buf
に上書き保存しますself.rew_buf
は強化学習アルゴリズムが方策の更新に使用します。self.reset_buf
は、この後のpost_physics_step
内でリセット対象の環境を特定するために使用されます。
compute_cartpole_reward() JITコンパイル関数
compute_cartpole_reward
は、Cartpole環境の状態に基づいて、強化学習エージェントが受け取る報酬と、環境をリセットすべきかどうかのフラグを計算する関数です。パフォーマンス向上のために @torch.jit.script デコレータが付与されています。これにより関数がTorchScriptにコンパイルされ、特にGPU上での実行が高速化されます。
##################################################################### ###=========================jit functions=========================### ##################################################################### @torch.jit.script # TorchScriptにコンパイルして高速化 def compute_cartpole_reward(pole_angle, pole_vel, cart_vel, cart_pos, reset_dist, reset_buf, progress_buf, max_episode_length): # type: (Tensor, Tensor, Tensor, Tensor, float, Tensor, Tensor, float) -> Tuple[Tensor, Tensor] # 引数と返り値の型ヒント (TorchScript用) # 基本報酬: 1.0 からペナルティを引く # ポールが傾くほどペナルティ大, カート/ポールの速度が大きいほどペナルティ小 reward = 1.0 - pole_angle * pole_angle - 0.01 * torch.abs(cart_vel) - 0.005 * torch.abs(pole_vel) # リセット条件に基づく報酬調整 (大きなペナルティを与える) # カートが範囲外に出た場合 reward = torch.where(torch.abs(cart_pos) > reset_dist, torch.ones_like(reward) * -2.0, reward) # ポールが倒れすぎた場合 (角度が pi/2 を超えたら) reward = torch.where(torch.abs(pole_angle) > np.pi / 2, torch.ones_like(reward) * -2.0, reward) # リセットフラグの計算 # カートが範囲外に出た場合 reset = torch.where(torch.abs(cart_pos) > reset_dist, torch.ones_like(reset_buf), reset_buf) # ポールが倒れすぎた場合 reset = torch.where(torch.abs(pole_angle) > np.pi / 2, torch.ones_like(reset_buf), reset) # 最大エピソード長に達した場合 reset = torch.where(progress_buf >= max_episode_length - 1, torch.ones_like(reset_buf), reset) # 計算した報酬とリセットフラグをタプルで返す return reward, reset
この関数内の主な処理は以下の通りです。
基本報酬の計算:各環境に対して、基本的な報酬を計算します。ここでは、立っている状態(角度0)を基準(1.0)とし、ポールの角度の二乗、カートの速度の絶対値、ポールの角速度の絶対値に応じたペナルティを減算しています。
終了条件に基づく報酬調整:エピソードが終了する条件(カートが指定範囲
reset_dist
を超える、またはポールが倒れすぎるabs(pole_angle) > pi/2
)を満たした場合、報酬を大きな負の値(ここでは -2.0)に上書きします。これは、終了状態に対するペナルティを明確にするためです。リセットフラグの計算:各環境がリセット条件を満たしているかを判定します。上記の終了条件に加えて、エピソードの最大長(
max_episode_length
)に達した場合もリセット対象となります。torch.whereを用いて、条件を満たす環境に対応するresetテンソルの要素を1に設定します。結果の返却:計算された報酬テンソル(
reward
)とリセットフラグテンソル(reset
)をタプルとして返します。
4.3 環境設定ファイル cfg/task/Cartpole.yaml
設定ファイルのコードは以下のリンク先から確認できます。
このファイル(Cartpole.yaml)は、Cartpoleタスクにおけるシミュレーション環境の設定を定義します。YAML形式で記述されており、主に以下のキー(セクション)とその配下のパラメータで構成されます。
env
環境の基本的な設定(環境数、間隔、リセット条件、観測/行動クリッピング、アセットなど)を定義しますnumEnvs
並列実行する環境の数envSpacing
並列で実行される各環境を配置する際の、環境間の距離resetDist
エピソードのリセット条件となる距離です。Cartpoleでは、カートが原点からこの距離以上離れた場合にリセットされますmaxEffort
カートに加えることができる力の最大値です。モーターのトルク制限などを模倣しますclipObservations
環境から得られる観測値をクリップする範囲 (± この値) ですclipActions
エージェントの行動値をクリップする範囲 (± この値) です。通常、行動値は [-1, 1] に正規化されているため 1.0 がよく用いられますasset
使用するアセット(URDF/MJCFファイルなど)に関する設定enableCameraSensors
カメラセンサーを使用するかどうかを指定する
sim
シミュレーションの物理演算に関する基本的なパラメータ(タイムステップ、サブステップ、重力、座標系など)を設定しますdt
シミュレーションのタイムステップ (秒単位)substeps
1タイムステップ (dt) あたりに実行する物理計算のサブステップ数です。値を大きくすると計算精度が向上しますが、計算コストが増加しますup_axis
:シミュレーション空間における上方向の軸 ('y' または 'z') を指定しますuse_gpu_pipeline
: GPUパイプラインを使用するかどうか (True/False) です。有効にすると、シミュレーションと学習の多くがGPU上で実行され、CPU-GPU間のデータ転送が削減され高速化が期待できますgravity
:シミュレーション空間に作用する重力加速度ベクトルを指定するphysx
: PhysX物理エンジン固有の詳細パラメータを設定するセクションです(ソルバー設定、衝突検出設定など)
task
強化学習タスク固有のパラメータ(報酬、終了条件、初期化など)を設定するセクションです。環境によっては、エピソードごとに物理パラメータ(質量、摩擦など)や初期状態をランダム化するための設定が含まれることもあります
4.4 学習設定ファイル cfg/train/CartpolePPO.yaml
設定ファイルのコードは以下のリンク先から確認できます。
このファイルでは、Cartpoleタスクの学習におけるアルゴリズムやハイパーパラメータなどを設定します。YAML形式で記述されており、主に以下のようなパラメータが設定されています。
seed
学習時の再現性を確保するためのseed値を設定algo
使用する強化学習アルゴリズムの種類を設定model
学習に使用する強化学習モデルを設定network
モデル内で使用するニューラルネットワークの種類や構造に関する設定load_checkpoint
学習済みのチェックポイントファイルをロードして学習を再開または評価するかどうかを指定(ブール型、 Trueまたは False)load_path
ロードするチェックポイントファイルのパスを指定 (load_checkpoint: True
の場合に有効)config
アルゴリズム固有のハイパーパラメータなどを設定します。代表的なパラメータには以下のようなものがありますgamma
割引率。将来の報酬を現在の価値に換算する際の割引係数です。1に近いほど将来の報酬を重視し、0に近いほど直近の報酬を重視します。Cartpoleの0.99は、将来にわたる累積報酬を高く評価する設定です。learning_rate
学習率。モデルのパラメータを更新する際のステップ幅(更新量)を決定します。値が大きいと学習は速く進みますが、最適解周辺で不安定になる可能性があります。逆に小さいと学習は安定しますが、収束に時間がかかります。lr_schedule
学習率のスケジューリング方法を指定します。学習の進行に合わせて学習率を変化させることで、初期段階では効率的に学習を進め、終盤では安定した収束を目指すことができます(例: 最初は大きく、徐々に小さくする)。max_epochs
学習の最大エポック数。学習の終了条件の一つです。エポックとは、一般的に、一定量の経験データ(複数の環境でのシミュレーションステップ)を収集し、それを用いてモデルのパラメータを更新する一連のサイクルのことを指します。このパラメータで、その更新サイクルを最大何回繰り返すかを指定します。
5. 最後に
今回は、Isaac Gymで独自の学習環境を実装するための基礎知識について解説し、Cartpoleの実装を確認、解説していきました。 実際にIsaac Gymを使用する際、標準で用意されている学習環境はチュートリアルとして動かすだけで終わってしまうことが多いかもしれません。 ぜひ本記事を参考に、ご自身の学習環境の作成に挑戦してみてください。その経験が、皆さんの強化学習への理解を深める一助となれば幸いです。
*1:これまでのIsaac Gym入門記事で紹介してきた強化学習は、複数の環境が並列で動作するため誤解されがちですが、実際には単一エージェントによる学習です。 一方、マルチエージェント強化学習では、単一の環境内に複数のエージェントが存在し、互いに協調して制御することで、タスクをより効率的に実行することを目指します。
*2:これは強化学習におけるActor-Critic手法の一種で、Actor(行動選択)とCritic(価値評価)で利用可能な情報が異なる(非対称である)点が特徴です。Isaac Gymでは、主にロボットハンドを用いたタスク(例:AllegroHand、ShadowHand)で利用されています。)