詳解 ROS2 エグゼキューター (Executor) - 2 -

執筆者:千葉工業大学 先進工学研究科 未来ロボティクス専攻 井上 叡 (インターン生)
監修者:稲葉 貴昭・高橋 浩和

※ 「ROS2」連載記事一覧はこちら


はじめに

本連載では、ROS2のC++ APIを利用しているほとんどの方が利用しているにも関わらず、その仕組みや仕様についてあまり知られていない、エグゼキューター(Executor)を紹介します。rclcppライブラリに実装されたエグゼキューターについてソースコードなどを追いながら、その仕組みについて詳しく追っていければと思います。
2回目となる今回は、rclcpp::Executorの実装を追っていきながら、スケジューリングメカニズムやエンティティの管理方法について紹介していきます。

なお、本連載ではROS2 jazzyでの各APIについて解説していきます。各ライブラリのバージョンは、rcl 9.2.5rclcpp 28.1.9rmw 7.3.2rclcpputils 2.11.2です。 以下にそれぞれのリポジトリへのリンクを掲載します。ソースコードを参照する際はクローンすると便利です。 また、本連載で掲載する各ライブラリのコードは全てこれらのリポジトリより引用します。

エグゼキューターの実装 <基底クラス rclcpp::Executor> 監視対象エンティティの管理

前回記事で、rclcpp::Executorが持つ重要なメンバ変数として、以下の4つを挙げました。

エンティティ管理系(今回解説):

  • rclcpp::executors::ExecutorEntitiesCollection
    • エグゼキューターが管理するエンティティの一覧を持つコンテナ
  • rclcpp::ExecutorEntitiesCollector
    • ExecutorEntitiesCollectionを管理する。構築や内容変更などを行う

待機機構系(前回解説済み):

  • rclcpp::WaitSet
  • rclcpp::WaitResult<rclcpp::WaitSet>

今回は上記のエンティティ管理系2つのクラスについて詳しく見ていきます。

rclcpp::executors::ExecutorEntitiesCollection

まず最初に、エンティティのコンテナであるrclcpp::executors::ExecutorEntitiesCollectionが持つメンバ変数から見てみましょう。 以下に、メンバ変数に関わる部分のみを抜粋します。

// ~~~ 省略 ~~~

/// Structure to represent a single entity's entry in a collection
template<typename EntityValueType>
struct CollectionEntry
{
  /// Weak pointer to entity type
  using EntityWeakPtr = typename EntityValueType::WeakPtr;
  /// Shared pointer to entity type
  using EntitySharedPtr = typename EntityValueType::SharedPtr;

  /// The entity
  EntityWeakPtr entity;

  /// If relevant, the entity's corresponding callback_group
  rclcpp::CallbackGroup::WeakPtr callback_group;
};

// ~~~ 省略 ~~~

template<typename EntityKeyType, typename EntityValueType>
class EntityCollection
  : public std::unordered_map<const EntityKeyType *, CollectionEntry<EntityValueType>>
{
public:
  /// Key type of the map
  using Key = const EntityKeyType *;

  /// Weak pointer to entity type
  using EntityWeakPtr = typename EntityValueType::WeakPtr;

  /// Shared pointer to entity type
  using EntitySharedPtr = typename EntityValueType::SharedPtr;

  // ~~~ メソッド等は省略 ~~~
}

struct ExecutorEntitiesCollection
{
  /// Collection type for timer entities
  using TimerCollection = EntityCollection<rcl_timer_t, rclcpp::TimerBase>;

  /// Collection type for subscription entities
  using SubscriptionCollection = EntityCollection<rcl_subscription_t, rclcpp::SubscriptionBase>;

  /// Collection type for client entities
  using ClientCollection = EntityCollection<rcl_client_t, rclcpp::ClientBase>;

  /// Collection type for service entities
  using ServiceCollection = EntityCollection<rcl_service_t, rclcpp::ServiceBase>;

  /// Collection type for waitable entities
  using WaitableCollection = EntityCollection<rclcpp::Waitable, rclcpp::Waitable>;

  /// Collection type for guard condition entities
  using GuardConditionCollection = EntityCollection<rcl_guard_condition_t, rclcpp::GuardCondition>;

  /// Collection of timers currently in use by the executor.
  TimerCollection timers;

  /// Collection of subscriptions currently in use by the executor.
  SubscriptionCollection subscriptions;

  /// Collection of clients currently in use by the executor.
  ClientCollection clients;

  /// Collection of services currently in use by the executor.
  ServiceCollection services;

  /// Collection of guard conditions currently in use by the executor.
  GuardConditionCollection guard_conditions;

  /// Collection of waitables currently in use by the executor.
  WaitableCollection waitables;

  // ~~~ メソッド等は省略 ~~~
}

--- rclcpp/executor_entities_collection.hppより引用

rclcpp::executors::ExecutorEntitiesCollectionクラスの構造について以下にまとめます。

  • それぞれのエンティティはstd::unordered_mapを継承した連想コンテナに格納される
    • コンテナのキーは以下のどちらか
      • Waitable以外は、rclで定義されたオブジェクト(rcl_subscription_tのような型)のポインタ
      • Waitableはrclcpp::Waitableのポインタ
    • コンテナの要素は以下の2つを持つ構造体
      • rclcppのエンティティ型への弱参照ポインタ
      • エンティティが属するコールバックグループへの弱参照ポインタ
  • 基本的にはコンテナとしての役割しか持たない。
    • 管理対象のエンティティの増減の監視はrclcpp::ExecutorEntitiesCollectorが行う。
    • このクラスへのエンティティの追加は、executor_entities_collection.hppで宣言されたbuild_entities_collection()関数で行われる。この関数は、エグゼキューターから呼ばれる。

ここで注意しなくてはいけない点として以下の2つがあります。

  1. このクラスが管理するのは弱参照
    • エンティティの生存期間はノード側が管理する
  2. エグゼキューターは部分的な監視を行う場合がある
    • 必ずノードが持つ全エンティティを監視する訳ではない
    • エグゼキューターに明示的に登録されたもののみが対象

2.の例としては、ノードがエンティティを構築したにも関わらず、そのエンティティをエグゼキューターに監視対象として通知していない場合などが想定されます。その場合は、ノードとしてはそのエンティティを持っているものの、エグゼキューターはそれを監視対象に入れておらず、このクラスの連想リストにもそれへの弱参照は追加されていないことになります。また、エンティティはエグゼキューターに渡されないと実行されないため、そのエンティティはただ構築されただけで実行されないことになります。*1

rclcpp::ExecutorEntitiesCollector

次は、ExecutorEntitiesCollectionを構築、変更する役割を持つrclcpp::ExecutorEntitiesCollectorを見ていきましょう。このクラスは、ノードやコールバックグループが持つエンティティに何らかの変更が生じた、もしくはノードやコールバックグループ自体がエグゼキューターに追加・削除された場合、それを検知してExecutorEntitiesCollectionを更新します。

このクラスのメンバ変数を確認しましょう。ExecutorEntitiesCollectorクラスの中で重要なメンバ変数とメソッドを取り出して以下に記載します。

// 説明に不要なメンバは省略

class ExecutorEntitiesCollector
{
public:
  RCLCPP_PUBLIC
  void
  add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

RCLCPP_PUBLIC
  void
  remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

RCLCPP_PUBLIC
  void
  add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr);

RCLCPP_PUBLIC
  void
  remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr);

  RCLCPP_PUBLIC
  void
  update_collections();

protected:
  using NodeCollection = std::set<
    rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
    std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>;

  using CallbackGroupCollection = std::set<
    rclcpp::CallbackGroup::WeakPtr,
    std::owner_less<rclcpp::CallbackGroup::WeakPtr>>;
  
  /// Callback groups that were added via `add_callback_group`
  CallbackGroupCollection manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// Callback groups that were added by their association with added nodes
  CallbackGroupCollection automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// nodes that are associated with the executor
  NodeCollection weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// nodes that have been added since the last update.
  NodeCollection pending_added_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// nodes that have been removed since the last update.
  NodeCollection pending_removed_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// callback groups that have been added since the last update.
  CallbackGroupCollection pending_manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// callback groups that have been removed since the last update.
  CallbackGroupCollection pending_manually_removed_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  /// Waitable to add guard conditions to
  std::shared_ptr<ExecutorNotifyWaitable> notify_waitable_;
}

--- rclcpp/executor_entities_collector.hppより引用

rclcpp::ExecutorEntitiesCollectorクラスの構造について以下にまとめます。

  • クラスがメンバとして持つ重要なコンテナは以下の3つ。これらのグループは、エグゼキューターの管理対象となるコールバックグループとノードの集合を保持する。
    • ノードに対する弱参照のstd::set
      • weak_nodes_
    • コールバックグループに対する弱参照のstd::set
      • manually_added_groups_
        • ユーザーがNodeBase::create_callback_group()等で構築して追加したコールバックグループを表す
      • automatically_added_groups_
        • ノードが必ず持つデフォルトコールバックグループ(NodeBase::default_callback_group_)を表す
  • pending_○○系のコンテナの役割
    • ノード・コールバックグループの追加・削除をスレッドセーフに行うための一時保管に利用される
    • このクラスのadd_○○remove_○○系のメソッドとupdate_collections()は2つ以上のスレッドから同時に呼ばれる可能性があるため必要

このクラスの管理対象は、ノードとコールバックグループです。そしてExecutorEntitiesCollectionと同じように、ノードとコールバックグループを直接保持するのではなく、あくまでそれらに対する弱参照のポインタを持ちます。このクラスのadd_○○remove_○○メソッドは、エグゼキューターに実装されたadd_○○remove_○○メソッド内部から呼び出されます。この呼び出しにより、監視対象のノードとコールバックグループに変更があったことを検知します。 ここで注意しておきたいのは、コールバックグループには対応するエンティティが紐付いているという事です。このクラスが直接的に扱うのはノードとコールバックグループですが、間接的にエンティティも管理しています。

rclcpp::Executorにおけるエンティティリストの更新方法

前項前々項にて、rclcpp::Executor内でエンティティを管理するのに利用される2つのクラスを紹介しました。本項では、前項までで紹介していなかった、rclcpp::Executor内での実際のエンティティの管理・更新方法について紹介します。

rclcpp::Executor::collect_entities()rclcpp::Executorが持つExecutorEntitiesCollectionを更新するためのメソッドです。 このメソッドが行う処理の重要な部分を以下に抜粋します。

void
Executor::collect_entities(){
  // ~~~ 省略 ~~~
  
  // Get the current list of available waitables from the collector.
  rclcpp::executors::ExecutorEntitiesCollection collection;
  this->collector_.update_collections();
  auto callback_groups = this->collector_.get_all_callback_groups();
  rclcpp::executors::build_entities_collection(callback_groups, collection);

  // ~~~ 省略 ~~~

}

--- rclcpp/executor.cppより引用

ここで、collector_rclcpp::Executorがメンバとして持つ、ExecutorEntitiesCollectorです。 this->collector_.update_collections();では、ノードとコールバックグループの追加・削除が発生していないかをチェックして、監視対象のコールバックグループのリストを更新しています。

ExecutorEntitiesCollectionを更新するのは、以下のbuild_entities_collection()です。

void
build_entities_collection(
  const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
  ExecutorEntitiesCollection & collection);

--- rclcpp/executor_entities_collection.hppより引用

この関数は、コールバックグループのリストとExecutorEntitiesCollectionの参照を受け取り、そのコールバックグループからエンティティを取り出してExecutorEntitiesCollection内にエンティティを追加・削除します。

まとめると以下のような流れで、コレクションが構築されます。

  1. 監視対象のコールバックグループのリストを更新する
    • コレクターのupdate_collections()の呼び出し
  2. コレクターが監視対象として持っている全てのコールバックグループを取り出す
    • コレクターのget_all_callback_groups()の呼び出し
  3. 2.で取り出したコールバックグループを全てbuild_entities_collection()に渡してコレクションを構築する

この一連の操作でエンティティは明示的には出てきませんが問題ありません。エンティティは必ず何らかのコールバックグループと紐付いており、コールバックグループからそれに紐付いた全てのエンティティを取得することが可能です。また、エグゼキューターの監視対象にエンティティを追加する際は必ずそれが所属するコールバックグループごと追加しなくてはいけません。そのため、コレクターがグループを監視し、コレクターが把握しているグループ一覧からコレクションを構築することで、適切に監視対象のエンティティのリストを作成することができます。

rclcpp::Executor派生クラスのスケジューリングと実行メカニズム

さて、ここまでrclcpp::Executorが持つ4つの重要なクラスについて見てきました。これでようやくスケジューリングと実行のメカニズムを読んでいく準備ができましたので、本章ではそれについて紹介していきます。

以前も紹介したように、rclcpp::Executorには以下の2つの派生クラスが存在します。

  • rclcpp::executors::SingleThreadedExecutor
  • rclcpp::executors::MultiThreadedExecutor

ユーザーがエグゼキューターとして実際に利用するのは主にこれらのクラスとなります。 まずは、派生クラスとしての実装が最も少なくて単純なSingleThreadedExecutorを見てみましょう。

rclcpp::executors::SingleThreadedExecutor

このクラスは、rclcpp::Executorに対して、spin()関数のみをオーバーライドした派生クラスです。このメソッドは、rclcpp::spin()を呼んだ際に内部的に呼ばれるものであり、最も基本的なspin関数と言えます。

rclcpp::executors::SingleThreadedExecutor::spin()の実装は以下の通りです。

void
SingleThreadedExecutor::spin()
{
  if (spinning.exchange(true)) {
    throw std::runtime_error("spin() called while already spinning");
  }
  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););

  // Clear any previous result and rebuild the waitset
  this->wait_result_.reset();
  this->entities_need_rebuild_ = true;

  while (rclcpp::ok(this->context_) && spinning.load()) {
    rclcpp::AnyExecutable any_executable;
    if (get_next_executable(any_executable)) {
      execute_any_executable(any_executable);
    }
  }
}

--- rclcpp/single_threaded_executor.cppより引用

やっている事は非常に単純です。以下の処理を無限ループしているのみです。

  1. 実行エンティティの取得
    • rclcpp::Executor::get_next_executable()を呼び、AnyExecutable型に実行準備が完了したエンティティを1つ取り出す
  2. エンティティの実行
    • rclcpp::Executor::execute_any_executable()によって、AnyExecutableが持つエンティティのコールバックを実行する

AnyExecutableはエンティティ型に関する"Any型"のようなもので、以下のように宣言されています。内部では、どれかのエンティティ型のポインタと関連データに対するポインタの組を1つ保持します。この関連データというのは、受信したメッセージなどコールバックの引数として渡されるものです。利用者は、このクラスからコールバックと必要な関連データを取り出し、コールバックを実行できます。

struct AnyExecutable
{
  RCLCPP_PUBLIC
  AnyExecutable();

  RCLCPP_PUBLIC
  virtual ~AnyExecutable();

  // Only one of the following pointers will be set.
  rclcpp::SubscriptionBase::SharedPtr subscription;
  rclcpp::TimerBase::SharedPtr timer;
  rclcpp::ServiceBase::SharedPtr service;
  rclcpp::ClientBase::SharedPtr client;
  rclcpp::Waitable::SharedPtr waitable;
  // These are used to keep the scope on the containing items
  rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
  std::shared_ptr<void> data {nullptr};
};

--- rclcpp/any_executable.hpp より引用

ここまで読んだだけでは、まだスケジューリングメカニズムの実体のようなものは見受けられません。それを探すために、次はrclcpp::Executor::get_next_executable()を追っていきましょう。

rclcpp::Executor::get_next_executable()

bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
  bool success = false;
  // Check to see if there are any subscriptions or timers needing service
  // TODO(wjwwood): improve run to run efficiency of this function
  success = get_next_ready_executable(any_executable);    // <== 準備完了したエンティティを取り出す
  // If there are none
  if (!success) {
    // Wait for subscriptions or timers to work on
    wait_for_work(timeout);                               // <== 何らかのエンティティが準備完了するまで待つ
    if (!spinning.load()) {
      return false;
    }
    // Try again
    success = get_next_ready_executable(any_executable);  // <== 準備完了したエンティティを取り出す
  }
  return success;
}

--- rclcpp/executor.cpp より引用。日本語コメントは筆者追記。

上記がget_next_executable()の実装です。内部では以下の処理を行います。

準備完了エンティティの取得試行  (get_next_ready_executable())
      ↓
取り出しが成功した → Yes → 取り出したエンティティを返す
      ↓ No
1つ以上のエンティティが準備完了まで待機する  (wait_for_work())
      ↓
準備完了エンティティの取得試行 (get_next_ready_executable()) → 成功 → 取り出したエンティティを返す 
      ↓ 失敗
取り出しが失敗した事を通知

一連の処理において、一番最初に準備完了エンティティを取り出しているのは、1回のループにおいて1つのエンティティしか実行しないためです。そのため、例えば2.の処理で複数個が同時に準備完了した場合には、get_next_ready_executable()を複数回呼んでそれらを全て実行しなくてはいけません。全て実行する前にwait_for_work()を呼んでしまうと、まだ実行されていないエンティティを上書きしてしまうので、1.で取り出しが成功した場合には2.以降の処理が行われないようになっています。

以降の2項では、get_next_executable()wait_for_work()について解説していきます。本項の1.2.3.の処理の流れを頭に入れて以降の説明を読んで頂くと理解しやすいかと思います。

まず、エンティティの準備完了までの待機を行うrclcpp::Executor::wait_for_work()メソッドから見ていきます。

rclcpp::Executor::wait_for_work()

void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
  TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());

  // Clear any previous wait result
  this->wait_result_.reset();

  // we need to make sure that callback groups don't get out of scope
  // during the wait. As in jazzy, they are not covered by the DynamicStorage,
  // we explicitly hold them here as a bugfix
  std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

  {
    std::lock_guard<std::mutex> guard(mutex_);

    if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {   // <---- ①の処理
      this->collect_entities();
    }

    auto callback_groups = this->collector_.get_all_callback_groups();
    cbgs.resize(callback_groups.size());
    for(const auto & w_ptr : callback_groups) {
      auto shr_ptr = w_ptr.lock();
      if(shr_ptr) {
        cbgs.push_back(std::move(shr_ptr));
      }
    }
  }

  this->wait_result_.emplace(wait_set_.wait(timeout));      //  <--- ②の処理

  // drop references to the callback groups, before trying to execute anything
  cbgs.clear();

  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
    RCUTILS_LOG_WARN_NAMED(
      "rclcpp",
      "empty wait set received in wait(). This should never happen.");
  } else {
    if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {  // <--- ③の処理
      auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
      if (current_notify_waitable_->is_ready(rcl_wait_set)) {
        current_notify_waitable_->execute(current_notify_waitable_->take_data());
      }
    }
  }
}

--- rclcpp/executor.cpp より引用 日本語コメントは筆者追記

以前WaitSetについて紹介した際にも登場したこのメソッドは主に以下3つの処理を行う。

  • ① 条件を満たしている場合、ExecutorEntitiesCollectionを再構築する
    • std::atomic_boolであるentities_need_rebuild_を確認し、条件を満たしている場合はcollect_entities()を呼ぶ
  • ② 1つ以上のエンティティが準備完了するのを待ち、その結果をrclcpp::Executor::wait_set_に保存する
    • 待機処理は前回紹介したWaitSet::waitを呼ぶことで実行される
  • ExecutorEntitiesCollectionの再構築が必要であるとの通知を受け取った場合、次のループで再構築するように設定する
    • rclcpp::executors::ExecutorNotifyWaitableというクラスを通じて再構築の通知があるかどうかを確認。このクラスについては後述。
    • 再構築の通知は主に、コールバックグループとノードの追加・削除が発生した場合に行われる。

②の行を見ると分かりますが、WaitSetで待った結果はstd::optional<rclcpp::WaitResult<rclcpp::WaitSet>>型のメンバ変数であるwait_result_に直接構築されます。このメンバ変数は、WaitSetで待った結果を保存しておくためのものです。結果の保存が副作用になっている事で少々見通しが悪くなっていますが、このwait_result_spin()関数の範囲では基本的にget_next_ready_executable()でのみ利用されます。(※それ以外の所では、他のspin系関数(spin_onceなど)で利用されます。)ちなみに、リセットはwait_for_work()の一番最初以外にもcollect_entities()でも行われます。

さて、wait_for_work()を見たことで、エンティティの準備完了の待機とその結果がwait_result_に保存される事が分かりました。次は、get_next_ready_executable()を見ていきましょう。

rclcpp::Executor::get_next_ready_executable()

get_next_ready_executable()は、準備完了したエンティティのリストの中から次に実行する1つのエンティティを取り出す関数です。これはエグゼキューターにおいて実質的にスケジューラの役割を果たします

以下が、get_next_ready_executable()の実装です。少し長いですが、大半がボイラープレートなので安心して下さい。

bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
  TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);

  bool valid_executable = false;

  //  wait_result_が有効かを確認する。結果がもしもReadyで無かった場合などはすぐに返る。
  if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
    return false;
  }

  if (!valid_executable) {
    size_t current_timer_index = 0;
    while (true) {
      auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
      if (nullptr == timer) {
        break;
      }
      current_timer_index = timer_index;
      auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
      if (entity_iter != current_collection_.timers.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          current_timer_index++;
          continue;
        }
        // At this point the timer is either ready for execution or was perhaps
        // it was canceled, based on the result of call(), but either way it
        // should not be checked again from peek_next_ready_timer(), so clear
        // it from the wait result.
        wait_result_->clear_timer_with_index(current_timer_index);
        // Check that the timer should be called still, i.e. it wasn't canceled.
        any_executable.data = timer->call();
        if (!any_executable.data) {
          current_timer_index++;
          continue;
        }
        any_executable.timer = timer;       // any_executableに結果を格納
        any_executable.callback_group = callback_group;
        valid_executable = true;           // valid_executableをtrueにして、以降の処理を行わないようにする。
        break;
      }
      current_timer_index++;
    }
  }

  if (!valid_executable) {
    while (auto subscription = wait_result_->next_ready_subscription()) {
      auto entity_iter = current_collection_.subscriptions.find(
        subscription->get_subscription_handle().get());
      if (entity_iter != current_collection_.subscriptions.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          continue;
        }
        any_executable.subscription = subscription;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto service = wait_result_->next_ready_service()) {
      auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
      if (entity_iter != current_collection_.services.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          continue;
        }
        any_executable.service = service;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto client = wait_result_->next_ready_client()) {
      auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
      if (entity_iter != current_collection_.clients.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          continue;
        }
        any_executable.client = client;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto waitable = wait_result_->next_ready_waitable()) {
      auto entity_iter = current_collection_.waitables.find(waitable.get());
      if (entity_iter != current_collection_.waitables.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          continue;
        }
        any_executable.waitable = waitable;
        any_executable.callback_group = callback_group;
        any_executable.data = waitable->take_data();
        valid_executable = true;
        break;
      }
    }
  }

  if (any_executable.callback_group) {
    if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
      assert(any_executable.callback_group->can_be_taken_from().load());
      any_executable.callback_group->can_be_taken_from().store(false);
    }
  }


  return valid_executable;
}

--- rclcpp/executor.cppより引用。日本語コメントは筆者追記

上記には、if(!valid_executable)で囲まれたボイラープレートコードが全てのエンティティの種類に対して書かれています。タイマーだけ処理が少し多いですが、これはタイマーの場合途中で待機が外部からキャンセルされる可能性があるからです。このタイマーのキャンセル考慮の処理を除けば、全てのエンティティが同じ手順で確認されています。確認手順は以下の通りです。

  1. 可能な場合、wait_result_から対象のエンティティを取り出す。
  2. 取り出した場合、ExecutorEntitiesCollectionを確認して、それがエグゼキューターの監視対象に入っているかを確認する
  3. エンティティが属するコールバックグループを取得して、それが現在取り出し可能なものかを確認する。
    • これは、コールバックグループの属性、MutuallyExclusiveに関わる項目です。後に解説します。
  4. 取り出したエンティティをany_executableに保存して、valid_executableをtrueにする。これがtrueになると以降のエンティティの取り出し処理は行われなくなる。

処理自体は素直で分かりやすいかと思います。ここで、valid_executableがtrueになると以降の処理が行われない、つまり最初に取り出せた1つのエンティティを返す事に着目して下さい。すると、この関数内の取り出し可能エンティティをチェックする順番がエンティティの優先度を意味するのが分かります。そして、これがスケジューリングメカニズムです。 つまり、エンティティは以下の順番で1つずつ取り出されて実行されます

優先度 エンティティ
1 Timer
2 Subscription
3 Service server
4 Service client
5 Waitable

例えば、Timerが2つ、Subscriptionが1つ、Waitableが2つ、同時に取り出された場合は以下のような順番で実行されます。

1つめのTimer
    ↓
2つめのTimer
    ↓
Subscription
    ↓
1つめのWaitable
    ↓
2つめのWaitable

これはスケジューリングメカニズムとは言うものの、Linuxのプロセスのスケジューリングのように大層なものではありません。一度のWaitSet::wait()の待機で準備完了したエンティティの実行順序を決めているに過ぎません。エグゼキューターが管理するエンティティの数が少なかったり、各エンティティの準備完了する周期が異なっていたりすると、WaitSet::wait()が返すエンティティの数は多くの場合1つかもしれません。そのような場合は、上記の優先順位は関係なく単に準備完了した順番に実行されることになります。

※ 確信はありませんが、一度のwait動作において同じ種類の複数のエンティティが準備完了になった場合、それらは構築したのが早いものから実行されると思われます。通信ミドルウェア側の実装によって違う動作になる可能性はありますが、rcl側で確認可能な範囲では恐らくこの動作だと思われます。

※ 実は、Timerよりも優先して実行されるものが1つだけあります。それが、wait_for_work()メソッドの一番最後に処理されている、current_notify_waitable_です。このWaitable派生クラスのメンバ変数は準備完了した場合、なんとwait_for_work()メソッドの中で実行されます。この変数の役割については後述します。

振り返り

ここまで、rclcpp::executors::SingleThreadedExecutor::spin()の内部処理について利用されている関数の処理などを追いながら紹介してきました。ここで、一旦今までの内容をまとめましょう。

void
SingleThreadedExecutor::spin()
{
  if (spinning.exchange(true)) {
    throw std::runtime_error("spin() called while already spinning");
  }
  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););

  // Clear any previous result and rebuild the waitset
  this->wait_result_.reset();
  this->entities_need_rebuild_ = true;

  while (rclcpp::ok(this->context_) && spinning.load()) {
    rclcpp::AnyExecutable any_executable;
    if (get_next_executable(any_executable)) {
      execute_any_executable(any_executable);
    }
  }
}

SingleThreadedExecutorの根幹となる実装は上記のspin()メソッドです。これは基底クラスの実装をオーバーライドしており、このエグゼキューター固有の実装になっています。このクラスをエグゼキューターとして利用する場合は、基本的には上記メソッドを利用してノードを実行する事になります。 このメソッドは以下の処理を無限ループして、順番にコールバックの処理を行います。

  1. get_next_executable()関数は準備完了したエンティティを1つ取り出す。
    • 1 必要な場合、待機を行う前にcollect_entities()を呼び出してエンティティのリストを更新する。
    • 2 待機を行い、1つ以上のエンティティが準備完了になった時に待機を解除する。取り出したエンティティはwait_result_変数に保存される。
    • 3 get_next_ready_executable()が、wait_result_を確認して次に実行する準備完了したエンティティを1つ取り出す。取り出す時、以下の規則に従う。
      • 準備完了エンティティが1つだけの場合、それを取り出す。
      • 複数ある場合、以下の優先順位に従って1つを取り出す。
        • 1 Timer
        • 2 Subscription
        • 3 Service server
        • 4 Service client
        • 5 Waitable
  2. execute_any_executable()は、取り出したエンティティのコールバックを実行する。
  3. ノードが終了していない場合、1.に戻る。

SingleThreadedExecutorについては以上となります。次は、それのマルチスレッド版であるrclcpp::executors::MultiThreadedExecutorについてみていきます。

rclcpp::executors::MultiThreadedExecutor

rclcpp::executors::MultiThreadedExecutorはその名前の通りマルチスレッドでノードを実行するエグゼキューターです。こちらもSingleThreadedExecutorと同じくオーバーライドしているのはspin()関数のみですが、新たにrun()というメソッドが追加で実装されています。

spin()メソッドは以下のように実装されています。これはメインスレッド+追加スレッドを構築し、引数で指定されたスレッド数でrun()を実行しています。

void
MultiThreadedExecutor::spin()
{
  if (spinning.exchange(true)) {
    throw std::runtime_error("spin() called while already spinning");
  }
  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
  std::vector<std::thread> threads;
  size_t thread_id = 0;
  {
    std::lock_guard wait_lock{wait_mutex_};
    for (; thread_id < number_of_threads_ - 1; ++thread_id) {
      auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
      threads.emplace_back(func);
    }
  }

  run(thread_id);
  for (auto & thread : threads) {
    thread.join();
  }
}

ということで、rclcpp::executors::MultiThreadedExecutorではコールバックの処理はrun()メソッドに主に実装されています。

void
MultiThreadedExecutor::run(size_t this_thread_number)
{
  (void)this_thread_number;
  while (rclcpp::ok(this->context_) && spinning.load()) {
    rclcpp::AnyExecutable any_exec;
    {
      std::lock_guard wait_lock{wait_mutex_};
      if (!rclcpp::ok(this->context_) || !spinning.load()) {
        return;
      }
      if (!get_next_executable(any_exec, next_exec_timeout_)) {
        continue;
      }
    }
    if (yield_before_execute_) {
      std::this_thread::yield();
    }

    execute_any_executable(any_exec);

    if (any_exec.callback_group &&
      any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
    {
      try {
        interrupt_guard_condition_->trigger();
      } catch (const rclcpp::exceptions::RCLError & ex) {
        throw std::runtime_error(
                std::string(
                  "Failed to trigger guard condition on callback group change: ") + ex.what());
      }
    }

    // Clear the callback_group to prevent the AnyExecutable destructor from
    // resetting the callback group `can_be_taken_from`
    any_exec.callback_group.reset();
  }
}

マルチスレッドに対応する為にロックを取得している箇所があるものの、基本的なスケジューリングや実行の方法はSingleThreadedExecutor::spin()と同じです。何故なら、内部ではget_next_executable()execute_any_executable()メソッドを利用しており、これらはオーバーライドしておらずSingleThreadedExecutorで使っているものと全く同一のものだからです。 ですので、本項ではSingleThreadedExecutor::spin()と違う点について焦点を当てて解説をしていきます。

シングルスレッド版に対する、実装上の大きな違いは以下の3点です

  • エンティティ取得の排他制御
    • get_next_executable()は同時に1スレッドでしか実行されない
    • コールバック実行のみが並列化される
  • 相互排他コールバックグループへの対応
    • コールバックを実行した後、コールバックグループが相互排他(MutuallyExclusive)かどうかを確認し、その場合interrupt_guard_condition_をトリガーする

前者は簡単です。これはコールバックの準備完了の待機&取り出し部分は1つのスレッドでのみ行われ、コールバックの実行のみが並列処理されるという事です。これだと折角複数のスレッドを用意したのに、当該の処理はシーケンシャルに行われる事になるのでなんだか効率が悪いように思われます。しかし、基本的には取り出し処理はコールバックの実行に比べて短い時間で終了することが期待されるため、多くの場合は大きな問題にはなりません。*2

後者は、コールバックグループの属性である、CallbackGroupType::MutuallyExclusive(相互排他)を実現している部分です。 interrupt_guard_condition_というメンバ変数は基底クラスであるrclcpp::Executorが持っているクラスで、rclcpp::GuardConditionというクラスのインスタンスです。端的に紹介すると、このクラスはtrigger()メソッドを使って、wait()メソッドで待っているそれに紐づけられたエンティティ(基本はWaitable派生クラス)に準備完了の通知ができます。このクラスについての紹介は次回記事で行います。interrupt_guard_condition_は、rclcpp::Executor内部ではnotify_waitable_current_notify_waitable_に結びついており、trigger()が呼ばれるとこれら2つのWaitableクラスが準備完了状態になります。

rclcpp::executors::MultiThreadedExecutorにおけるMutuallyExclusiveの実現方法

まずは、理解しやすい相互排他の機能についてコードを見ながら動作を詳しく確認していきましょう。

void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
  // ~~~ 省略 ~~~
  // Reset the callback_group, regardless of type
  any_exec.callback_group->can_be_taken_from().store(true);
}

bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
  TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);

  bool valid_executable = false;

  if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
    return false;
  }

  if (!valid_executable) {
    size_t current_timer_index = 0;
    while (true) {
      auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
      if (nullptr == timer) {
        break;
      }
      current_timer_index = timer_index;
      auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
      if (entity_iter != current_collection_.timers.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {  // <=== 注目①
          current_timer_index++;
          continue;
        }
        // At this point the timer is either ready for execution or was perhaps
        // it was canceled, based on the result of call(), but either way it
        // should not be checked again from peek_next_ready_timer(), so clear
        // it from the wait result.
        wait_result_->clear_timer_with_index(current_timer_index);
        // Check that the timer should be called still, i.e. it wasn't canceled.
        any_executable.data = timer->call();
        if (!any_executable.data) {
          current_timer_index++;
          continue;
        }
        any_executable.timer = timer;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
      current_timer_index++;
    }
  }

  if (!valid_executable) {
    while (auto subscription = wait_result_->next_ready_subscription()) {
      auto entity_iter = current_collection_.subscriptions.find(
        subscription->get_subscription_handle().get());
      if (entity_iter != current_collection_.subscriptions.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {  // <=== 注目①
          continue;
        }
        any_executable.subscription = subscription;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto service = wait_result_->next_ready_service()) {
      auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
      if (entity_iter != current_collection_.services.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {  // <=== 注目①
          continue;
        }
        any_executable.service = service;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto client = wait_result_->next_ready_client()) {
      auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
      if (entity_iter != current_collection_.clients.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {  // <=== 注目①
          continue;
        }
        any_executable.client = client;
        any_executable.callback_group = callback_group;
        valid_executable = true;
        break;
      }
    }
  }

  if (!valid_executable) {
    while (auto waitable = wait_result_->next_ready_waitable()) {
      auto entity_iter = current_collection_.waitables.find(waitable.get());
      if (entity_iter != current_collection_.waitables.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {  // <=== 注目①
          continue;
        }
        any_executable.waitable = waitable;
        any_executable.callback_group = callback_group;
        any_executable.data = waitable->take_data();
        valid_executable = true;
        break;
      }
    }
  }

  if (any_executable.callback_group) {
    if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {  // <=== 注目②
      assert(any_executable.callback_group->can_be_taken_from().load());
      any_executable.callback_group->can_be_taken_from().store(false);
    }
  }


  return valid_executable;
}

void
build_entities_collection(
  const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
  ExecutorEntitiesCollection & collection)
{
  collection.clear();

  for (auto weak_group_ptr : callback_groups) {
    auto group_ptr = weak_group_ptr.lock();
    if (!group_ptr) {
      continue;
    }

    if (group_ptr->can_be_taken_from().load()) {     // <=== そのグループが実行中の場合、そもそも監視対象にしない
      group_ptr->collect_all_ptrs(
        [&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
          collection.subscriptions.insert(
            {
              subscription->get_subscription_handle().get(),
              {subscription, weak_group_ptr}
            });
        },
        // ~~~ 省略 上記の処理を他のエンティティにも行う ~~~
}

--- rclcpp/executor.cpp rclcpp/executor_entities_collection.cppより引用。日本語コメントは筆者。

上記に抜粋したのはexecute_any_executable()の最後の2行、build_entities_collection()の前半15行ほどとget_next_ready_executable()の全体です。 前者2つは切り抜いた部分、get_next_ready_executable()に関しては「注目」とコメントを書いた部分を注意して見て下さい。

まずは、get_next_ready_executable()execute_any_executable()について見ていきます。これらはrun()メソッドの中でget_next_ready_executable() -> execute_any_executable()の順番で実行される事を確認しましょう。そして、これらのメソッドはCallbackGroupクラスが持つcan_be_taken_from()というメソッドが返すstd::atomic_bool型の真理値を確認したり変更しています。この変数は、対象のコールバックグループに属するエンティティのコールバックを今実行しても大丈夫かを表しています。相互排他として設定されているコールバックグループに属するエンティティのコールバックは同時に2つ以上が実行されてはいけないため、それを実行する際に今他のスレッドで同じグループに属するコールバックが実行されていないかどうかを確認する必要があります。なので注目①でget_next_ready_executable()の中でエンティティを取り出す際に、can_be_taken_from()の値を確認しているのです。取り出しが成功すると、同時実行されてしまわないように注目②内でcan_be_take_from()をfalseに設定します。その後、取り出したエンティティをexecute_any_executable()にて実行完了したらtrueに設定して、get_next_ready_executable()でそのグループのエンティティを再び取り出しできるようにします。

しかし、なぜstd::atomic_bool1つだけで相互排他が実現できるのかを疑問に思う方もいらっしゃるかもしれません。この理由は、MultiThreadedExecutor::run()内でget_next_ready_executable()を実行する際に、rclcpp::executors::MultiThreadedExecutor::wait_mutex_をロックしているからです。これにより、get_next_ready_executable()は同時に2スレッド以上で実行される事はなくなるため、1つのstd::atomic_boolのみで相互排他が実現できています。

最後にbuild_entities_collection()について見てみましょう。これはExecutor::collect_entities()で呼ばれ、wait_set_で準備完了を確認する対象となるエンティティのリストを更新する役割を持つメソッドです。筆者コメントを記載した箇所を見ると、ここでもcan_be_taken_fromを確認しています。これは、無駄なエンティティをチェックするのを省くために、相互排他のグループのエンティティが今実行中の場合、そもそもwait_set_で確認する対象にそのグループを入れないようになっています。結局get_next_ready_executable()で実行可能かどうかが確認されるのでこの処理が無くても動作はしますが、効率化のために入っていると思われます。

rclcpp::executors::MultiThreadedExecutorにおけるinterrupt_guard_condition_の役割

関連するコードを以下にまとめて掲載します。

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
  interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
  context_(options.context),
  notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
      [this]() {
        this->entities_need_rebuild_.store(true);     // <=== notify_waitableのコールバック
      })),
  entities_need_rebuild_(true),
  collector_(notify_waitable_),
  wait_set_({}, {}, {}, {}, {}, {}, options.context),
  current_notify_waitable_(notify_waitable_),
  impl_(std::make_unique<rclcpp::ExecutorImplementation>())
{
  // ~~~ 省略 ~~~

  notify_waitable_->add_guard_condition(interrupt_guard_condition_);  // <=== interrupt_guard_condition_の紐づけ
  notify_waitable_->add_guard_condition(shutdown_guard_condition_);

  // we need to initially rebuild the collection,
  // so that the notify_waitable_ is added
  collect_entities();
}

void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
  // ~~~ 省略 ~~~

    if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {  // <=== entities_need_rebuild_の確認 
      this->collect_entities();
    }

  // ~~~ 省略 ~~~

  this->wait_result_.emplace(wait_set_.wait(timeout));
  
  // ~~~ 省略 ~~~

  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
    RCUTILS_LOG_WARN_NAMED(
      "rclcpp",
      "empty wait set received in wait(). This should never happen.");
  } else {
    if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {  // <=== ここが重要
      auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
      if (current_notify_waitable_->is_ready(rcl_wait_set)) {
        current_notify_waitable_->execute(current_notify_waitable_->take_data());    // <=== コールバックの実行
      }
    }
  }
}

void
MultiThreadedExecutor::run(size_t this_thread_number)
{
  while (rclcpp::ok(this->context_) && spinning.load()) {
    // ~~~ 省略 ~~~
      if (!get_next_executable(any_exec, next_exec_timeout_)) {
        continue;
      }
    // ~~~ 省略 ~~~

    execute_any_executable(any_exec);

    if (any_exec.callback_group &&
      any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
    {
      try {
        interrupt_guard_condition_->trigger();           // <=== トリガーしている
      } catch (const rclcpp::exceptions::RCLError & ex) {
        throw std::runtime_error(
                std::string(
                  "Failed to trigger guard condition on callback group change: ") + ex.what());
      }
    }

    // Clear the callback_group to prevent the AnyExecutable destructor from
    // resetting the callback group `can_be_taken_from`
    any_exec.callback_group.reset();
  }
}

--- rclcpp/executor.cppとrclcpp/multi_threaded_executor.cppより引用。日本語コメントは筆者

改めて紹介すると、interrupt_guard_condition_rclcpp::Executor内部ではWaitableであるnotify_waitable_に結びついており、trigger()を呼ぶとこのWaitableクラスを準備完了状態にする効果があります。notify_waitable_にはrclcpp::Executorのコンストラクタでコールバックが設定されており、そのコールバックはentities_need_rebuild_をtrueにします。そして、この変数がtrueになっていると、wait_for_work()メソッド内でエンティティの待機(WaitSet::wait())を実行する前にcollect_entities()が実行されるようになります。 つまり、上記をまとめるとinterrupt_guard_condition_->trigger()を呼ぶと、最終的にcollect_entities()を実行する効果があるということになります。

なお、wait_for_work()の最後では、notify_waitable_ではなくcurrent_notify_waitable_という変数を利用していますが、これは単に実装上の都合で使っているnotify_waitableをコピーした変数です。ここでコードを読む上では、current_notify_waitable_notify_waitable_と同じものだと思って頂いて問題ありません。

さて、ある程度動作を理解した所で実際のコードの流れを追いましょう。 まず、run()の最後では対象のコールバックグループが相互排他だった場合にinterrupt_guard_condition_->trigger()を呼び、current_notify_waitable_を準備完了状態にさせます。すると、次に呼ばれたwait_for_work()の最後でcurrent_notify_waitable_のコールバックが実行されてentities_need_rebuild_がtrueになります。そして、その次に呼ばれたwait_for_work()メソッドにてWaitSet::wait()による待機を行う前にcollect_entities()を呼び出してエンティティのリストを更新します。

trigger()が呼ばれてからcollect_entitiesが呼ばれるまでの流れをまとめると、以下のようになります。

  1. | @ run()の最後 | コールバックグループが相互排他だった場合、interrupt_guard_condition_->trigger()する
  2. | @ 次に呼ばれた wait_for_work()の最後 | current_notify_waitable_のコールバックが実行され、entities_need_rebuild_がtrueになる
  3. | @ 次の次に呼ばれた wait_for_work()の最初 | collect_entities()が実行される

上記から、相互排他のコールバックグループに含まれるコールバックを実行した場合、その次の次のwait_for_work()呼び出しにてエンティティのリストが再構築されることが分かります。

ここでリストを再構築する理由には、前項で紹介したbuild_entities_collection()が関わってきます。例えば、実行時間が長い相互排他のグループに属するコールバックを実行している間に、他のスレッドにてcollect_entities()が呼ばれた場合、そのグループのcan_be_taken_from()はfalseですから、build_entities_collection()にて効率化のためにwait_set_の監視対象からそのグループが外されます。そのため、コールバックの実行が終わったらまたそのグループを監視するように再度設定し直す必要があります。

相互排他のグループに属するコールバックの実行終了後にinterrupt_guard_condition_->trigger()が呼ばれる理由は、そのグループがエンティティの監視リストから外れている可能性があるため、collect_entities()を実行してそれを監視リストに戻す必要があるからです。

ちなみに、前に書いたようにエンティティのリストが再構築されるのはコールバックの実行が終わってから2回目のwait_for_work()呼び出しの際です。つまり、次のwait_for_work()呼び出しでは監視リストから除かれてしまった相互排他のグループは変わらず実行されない事になります。ここで運悪く、そのwait_for_work()呼び出しで取り出されたコールバック達の実行時間が長かった場合、除かれたグループが戻されるのには時間が掛かる事になります。特定の相互排他のグループのコールバックが中々実行されず、資源飢餓のような状態になっている場合、このような仕様が原因の可能性もあります。

振り返り

ここまで、MultiThreadedExecutorについて紹介してきた内容をまとめます。

  • スケジューリングメカニズムはSingleThreadedExecutorと同様
  • 動作はSingleThreadedExecutorの実行メカニズムが複数スレッドで実行されるイメージ
    • ただし、rclcpp::Executor::get_next_executable()によるエンティティの取り出しは排他処理され、シーケンシャルに実行される
    • 並列に実行されるのはコールバックだけ
  • 相互排他のグループに属するコールバックが実行される時、以下のことが発生する
    • それに属するコールバック同士が並列実行されないよう、get_next_executable()での取り出しに制限が掛かる
    • それに属するコールバックが実行されている間、効率化のためにそのグループはwait_set_の監視リストから除かれる場合がある
      • 👆に対応するため、相互排他グループに属するコールバックの実行終了後にエンティティリストの再構築が発生する

おわりに

本記事では、rclcpp::Executorの基礎的な機能の紹介と、その派生クラスであるSingleThreadedExecutorとMultiThreadedExecutorの実行メカニズムについて紹介しました。 前回記事と本記事で、エグゼキューターに関する基礎的な内容はほとんどカバーできたのではないかと思います。次回は、ここまで紹介出来なかったエグゼキューターに関連する細かい仕様や実装についていくつか紹介して、本連載の最後の記事としようと思います。

参考文献

*1:この説明は若干正しくありません。エグゼキューターを利用しないでも実行する方法はあります。例えば、rclcpp/wait_for_message.hppのようにrclcpp::WaitSetを直接利用するなどすれば可能です。

*2:もちろん、これは最高に効率が良い実装とは言えませんし、状況によっては資源飢餓のような状況も発生してしまいます。また、interrupt_guard_condition_の項で紹介しているように、エンティティの再構築の頻度が高いのもあまり良くありません。現在、試験的ですがEventExecutorというクラスが実装されており、こちらの方がより効率の良い動作をすると報告されています。