詳解 ROS2 エグゼキューター (Executor) - 3 -

執筆者:千葉工業大学 先進工学研究科 未来ロボティクス専攻 井上 叡 (インターン生)
監修者:稲葉 貴昭・高橋 浩和

※ 「ROS2」連載記事一覧はこちら


はじめに

本連載では、ROS2のC++ APIを利用しているほとんどの方が利用しているにも関わらず、その仕組みや仕様についてあまり知られていない、エグゼキューター(Executor)を紹介します。rclcppライブラリに実装されたエグゼキューターについてソースコードなどを追いながら、その仕組みについて詳しく追っていければと思います。3回目となる今回は、前回までの記事で触れられなかったエグゼキューターに関する少し高度な概念や細かい内容について紹介していきます。なお、本記事の各章は独立して読めるように構成されています。

なお、本連載ではROS2 jazzyでの各APIについて解説していきます。各ライブラリのバージョンは、rcl 9.2.5rclcpp 28.1.9rmw 7.3.2rclcpputils 2.11.2です。 以下にそれぞれのリポジトリへのリンクを掲載します。ソースコードを参照する際はクローンすると便利です。 また、本連載で掲載する各ライブラリのコードは全てこれらのリポジトリより引用します。

Waitableの詳細

rclcpp::WaitSetクラスが扱うエンティティの1つとして、rclcpp::Waitableがあるというのは第一回の記事にて紹介したかと思います。本章では、このWaitableクラスについてもう少し詳しく紹介していきます。

まず、Waitableクラスがどのようなものだったか再度確認しましょう。

  • 何らかの満たすべき条件を設定することができ、現在その条件を達成しているか否かを外部から確認できる。
  • 条件を満たした時に、「実行されるべき一連の処理」をクラス内部に保持する。("一連の処理"はコールバックとして保持しても、クラスメソッドとして直接実装しても良い)
  • 「条件を満たすまでの待機」 -> 「処理の実行」の流れを一般化した基底クラス
  • このクラスを継承することで、rclcpp::Executorクラスによる待機・実行を行うことが可能になる

rclcpp::Waitableは上記のような特徴を持ったクラスです。このクラスを継承しているクラスの例としては、Actionのクライアントとサーバー、rclcpp::executors::ExecutorNotifyWaitableなどがあります。

次に、rclcpp::Waitableの定義を見てみましょう。

class Waitable
{
public:
  /// Add the Waitable to a wait set.
  /**
   * \param[in] wait_set A handle to the wait set to add the Waitable to.
   * \throws rclcpp::exceptions::RCLError from rcl_wait_set_add_*()
   */
  RCLCPP_PUBLIC
  virtual
  void
  add_to_wait_set(rcl_wait_set_t & wait_set);

  //~~~ 省略 ~~~

  /// Check if the Waitable is ready.
  /**
   * The input wait set should be the same that was used in a previously call to
   * `add_to_wait_set()`.
   * The wait set should also have been previously waited on with `rcl_wait()`.
   *
   * \param[in] wait_set A handle to the wait set the Waitable was previously added to
   *   and that has been waited on.
   * \return `true` if the Waitable is ready, `false` otherwise.
   */
  RCLCPP_PUBLIC
  virtual
  bool
  is_ready(const rcl_wait_set_t & wait_set);

  //~~~ 省略 ~~~

  /// Take the data so that it can be consumed with `execute`.
  /**
   * NOTE: take_data is a partial fix to a larger design issue with the
   * multithreaded executor. This method is likely to be removed when
   * a more permanent fix is implemented. A longterm fix is currently
   * being discussed here: https://github.com/ros2/rclcpp/pull/1276
   *
   * This method takes the data from the underlying data structure and
   * writes it to the void shared pointer `data` that is passed into the
   * method. The `data` can then be executed with the `execute` method.
   *
   * Before calling this method, the Waitable should be added to a wait set,
   * waited on, and then updated.
   *
   * Example usage:
   *
   * ```cpp
   * // ... create a wait set and a Waitable
   * // Add the Waitable to the wait set
   * waitable.add_to_wait_set(wait_set);
   * // Wait
   * rcl_ret_t wait_ret = rcl_wait(wait_set);
   * // ... error handling
   * // Update the Waitable
   * waitable.update(wait_set);
   * // Execute any entities of the Waitable that may be ready
   * std::shared_ptr<void> data = waitable.take_data();
   * ```
   */
  RCLCPP_PUBLIC
  virtual
  std::shared_ptr<void>
  take_data() = 0;

  //~~~ 省略 ~~~

  /// Execute data that is passed in.
  /**
   * Before calling this method, the Waitable should be added to a wait set,
   * waited on, and then updated - and the `take_data` method should be
   * called.
   *
   * Example usage:
   *
   * ```cpp
   * // ... create a wait set and a Waitable
   * // Add the Waitable to the wait set
   * waitable.add_to_wait_set(wait_set);
   * // Wait
   * rcl_ret_t wait_ret = rcl_wait(wait_set);
   * // ... error handling
   * // Update the Waitable
   * waitable.update(wait_set);
   * // Execute any entities of the Waitable that may be ready
   * std::shared_ptr<void> data = waitable.take_data();
   * waitable.execute(data);
   * ```
   */
  RCLCPP_PUBLIC
  virtual
  void
  execute(const std::shared_ptr<void> & data);

  //~~~ 省略 ~~~
  private:
  std::atomic<bool> in_use_by_wait_set_{false}; // <=== メンバ変数はこれだけ
}

---rclcpp/waitable.hppより引用 日本語コメントは筆者

上記はrclcpp::Waitableのクラス定義からいくつか重要なメソッドのみを取り出したものです。 上記のメソッドは、それぞれ以下のような用途があります。

  1. Waitable::is_ready()
  2. Waitableが準備完了したかどうかを返す
  3. rclcpp::WaitResultがこのメソッドを実行して対象のWaitableが準備完了したかどうかを確認する
  4. Waitable::execute()
  5. Waitableに設定された準備完了後の処理を実行する
  6. このクラスはオーバーライドされることが前提
  7. オーバーライド先で、execute()に直接処理を実装するか、設定したコールバックを実行するような実装にするかなどを決定する
  8. Waitable::take_data()
  9. execute()を実行する際に渡す必要があるデータを取り出す効果がある
  10. このクラスは基本的に準備完了 -> 処理の実行までに時間的な隔たりがあるので、準備完了時に得たデータは状態としてクラスに持たせる必要がある。それを取り出す役割
  11. Waitable::add_to_wait_set()
  12. rcl_wait_set_tに対して、rcl_wait()の待機を解除するためのエンティティを設定するメソッド

1のメソッドは、rclcpp::WaitReult::next_ready_waitableで準備完了したWaitableを取り出す際に利用されます。

template<class WaitSetT>
class WaitResult final{
  // ~~~ 省略 ~~~
  std::shared_ptr<rclcpp::Waitable>
  next_ready_waitable()
  {
    check_wait_result_dirty();
    auto waitable = std::shared_ptr<rclcpp::Waitable>{nullptr};
    auto data = std::shared_ptr<void>{nullptr};

    if (this->kind() == WaitResultKind::Ready) {
      auto & wait_set = this->get_wait_set();
      auto & rcl_wait_set = wait_set.get_rcl_wait_set();
      while (next_waitable_index_ < wait_set.size_of_waitables()) {
        auto cur_waitable = wait_set.waitables(next_waitable_index_++);
        if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) { // <=== ここで利用
          waitable = cur_waitable;
          break;
        }
      }
    }

    return waitable;
  }
  // ~~~ 省略 ~~~
}

--- rclcpp/wait_result.hppより引用

2.3.のメソッドは、エグゼキューター内でWaitableに設定された処理を実行する際に利用されます。以下の通りです。

// ~~~ 省略 ~~~
void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
// ~~~ 省略 ~~~
  if (any_exec.waitable) {
    const std::shared_ptr<void> & const_data = any_exec.data;
    any_exec.waitable->execute(const_data);  // <=== 実行
  }
// ~~~ 省略 ~~~
}
// ~~~ 省略 ~~~
bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
// ~~~ 省略 ~~~
  if (!valid_executable) {
    while (auto waitable = wait_result_->next_ready_waitable()) {
      auto entity_iter = current_collection_.waitables.find(waitable.get());
      if (entity_iter != current_collection_.waitables.end()) {
        auto callback_group = entity_iter->second.callback_group.lock();
        if (!callback_group || !callback_group->can_be_taken_from()) {
          continue;
        }
        any_executable.waitable = waitable;
        any_executable.callback_group = callback_group;
        any_executable.data = waitable->take_data();   // <=== データの取り出し
        valid_executable = true;
        break;
      }
    }
  }
// ~~~ 省略 ~~~
}
// ~~~ 省略 ~~~

---rclcpp/executor.cppより引用

4.については少し追加の説明が必要です。そもそも、rclcpp::Waitableis_ready()メソッドでrclcpp側だけで準備完了の判定ができるのに、なぜわざわざrcl_wait_set_tにエンティティを追加しなくてはいけないのでしょうか?

それは、これがないとrcl_wait()のブロッキングを解除できないからです

エグゼキューターによる待機処理で利用されている、rclのAPI rcl_wait()には以下の2つの役割があります。

  1. 何らかのエンティティが準備完了するまで、そこで処理をブロックする
  2. SubscriptionやServiceなど、通信機能(rmw側の機能)に関連する操作が準備完了の基準のエンティティに対して、それが準備完了したと通知する

rcl_wait()において、1.によるブロッキングの解除と2.の準備完了通知はほぼ同義です。そのため、SubscriptionやServiceが準備完了した時、rcl_wait()のブロックも同時に解除されます。しかしながら、Waitableに関してはrmw側を通さず準備完了をis_ready()を使ってrclcpp側で判断してしまうので、このままだとrcl_wait()のブロッキングを解除できません。そうなると、折角準備完了しているのにも関わらず、ずっとrcl_wait()がブロックされ続けてしまいます。

ですので、rclcpp::Waitableには必ずrcl_wait()を起床させるためのエンティティを持たせる必要があります

4.のメソッドは、Waitableが持つエンティティをrcl_wait_set_tに追加する役割を担います。 ここからは、rclcpp::Waitablercl_wait()を起床させるために利用可能な以下2つのエンティティを紹介します。

  • rcl_guard_condition_t
    • rclcpp側のラッパークラス rclcpp::GuardConditionが存在
  • rcl_event_t

※ もちろん、上記意外にSubscriptionやServiceなどを起床に利用することも可能です。WaitableであるActionは、内部でSubscriptionやServiceを利用しています。

rcl_guard_condition_t (rclcpp::GuardCondition)

GuardConditionは、rcl側ではrcl_guard_condition_t、rclcppではそれのラッパークラスrclcpp::GuardConditionというクラスとして実装されています。

この機能は何らかの条件を設定し、それを満たした際に待機機能に対して準備完了を通知する役割があります。 具体的にはこのクラスを利用すると、好きなタイミングでrmw_wait()のブロッキング(= rcl_wait()のブロッキング)を解除することが出来ます。

ちなみに、全く同じ名前の機能がCycloneDDSとfastDDSにはあります。これらを意識して実装されているのだと思います。

rclcpp::GuardConditionで重要なメソッドは以下のtrigger()です。

  // ~~~ 省略 ~~~
  /// Signal that the condition has been met, notifying both the wait set and listeners, if any.
  /**
   * This function is thread-safe, and may be called concurrently with waiting
   * on this guard condition in a wait set.
   *
   * \throws rclcpp::exceptions::RCLError based exceptions when underlying
   *   rcl functions fail.
   */
  RCLCPP_PUBLIC
  void
  trigger();
  // ~~~ 省略 ~~~

このメソッドを呼ぶことで、GuardConditionはトリガーされ、rcl_wait()もといrclcpp::WaitSet::wait()はブロック解除されます。 このtrigger()はあくまでブロック解除を通知するだけです。Waitable側のis_ready()がtrueを返すかの設定はWaitable側で行う必要があります。しかし、基本的には状況としてはtrigger() = 準備完了である場合が多いので、rclcpp::Waitable::is_ready()はGuardConditionの状態を見て準備完了を判断する実装にしてもそこまで問題無いかと思います。

例えば、前記事のMultiThreadedExecutorの章で出てきたrclcpp::executors::ExecutorNotifyWaitableクラスのis_ready()は以下のようにGuardConditionがトリガーされたかを確認する実装になっています。

bool
ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
{
  std::lock_guard<std::mutex> lock(guard_condition_mutex_);

  bool any_ready = false;
  for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
    const auto * rcl_guard_condition = wait_set.guard_conditions[ii];

    if (nullptr == rcl_guard_condition) {
      continue;
    }
    for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
      auto guard_condition = weak_guard_condition.lock();
      if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
        any_ready = true;
        break;
      }
    }
  }
  return any_ready;
}

--- rclcpp/executor_notify_waitable.cppより引用

rcl_event_t

rcl_event_tは通信機能側(rmw側)から、上位層であるrclやrclcppに対して通信機能に関する出来事の通知を行う機能です。この機能はGuardConditionに比べると利用頻度が低く、rclcppではWaitableであるrclcpp::EventHandlerBaseとそれを継承したrclcpp::EventHandlerでのみ直接的に利用されています。これら2つのWaitableは、PublisherとSubscriberが利用します。それらのWaitableにはrclcpp/event_handler.hppに定義された、以下のような種類のイベントとそれに対して実行するコールバックが設定されます。

// ~~~ 省略 ~~~
// イベントの種類
using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t;
using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t;
using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t;
using QOSLivelinessLostInfo = rmw_liveliness_lost_status_t;
using QOSMessageLostInfo = rmw_message_lost_status_t;
using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_t;
using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t;

using IncompatibleTypeInfo = rmw_incompatible_type_status_t;
using MatchedInfo = rmw_matched_status_t;

// ~~~ 省略 ~~~

// コールバックの種類
/// Contains callbacks for various types of events a Publisher can receive from the middleware.
struct PublisherEventCallbacks
{
  QOSDeadlineOfferedCallbackType deadline_callback;
  QOSLivelinessLostCallbackType liveliness_callback;
  QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback;
  IncompatibleTypeCallbackType incompatible_type_callback;
  PublisherMatchedCallbackType matched_callback;
};

/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
struct SubscriptionEventCallbacks
{
  QOSDeadlineRequestedCallbackType deadline_callback;
  QOSLivelinessChangedCallbackType liveliness_callback;
  QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback;
  QOSMessageLostCallbackType message_lost_callback;
  IncompatibleTypeCallbackType incompatible_type_callback;
  SubscriptionMatchedCallbackType matched_callback;
};
// ~~~ 省略 ~~~

--- rclcpp/event_handler.hppより引用 日本語コメントは筆者

また、is_ready()は以下のように実装されています。イベントが発生しているかどうかを確認しています。

/// Check if the Waitable is ready.
bool
EventHandlerBase::is_ready(const rcl_wait_set_t & wait_set)
{
  return wait_set.events[wait_set_event_index_] == &event_handle_;
}

--- rclcpp/event_handler.cppより引用

rcl_event_tは基本的には、ユーザーがWaitableを実装する際にはあまり利用しないかと思います。

まとめ

以上が、Waitableの実装に関連する事項です。基本的にはユーザーがWaitableを実装する際はrclcpp::GuardConditionを利用するのが良いかと思われます。 それ以外のSubscription等のエンティティを利用したWaitableの実装については、rclcpp_action::Clientrclcpp_action::Serverが参考になるかと思います。

rclcpp::Executorが持つNotifyWaitableクラス

本章では、rclcpp::Executorが直接所持するWaitableクラスrclcpp::executors::ExecutorNotifyWaitableについて、その役割や実装について紹介していきます。このクラスについては前記事のMultiThreadedExecutorの項目で少し紹介しましたが、改めて基礎的な部分から紹介します。

rclcpp::executors::ExecutorNotifyWaitableは、その名前の通りrclcpp::Waitableを継承したクラスです。このクラスは、Waitableを比較的素朴に実装したものです。1つ以上のrclcpp::GuardConditionクラスを内部に保持し、それらのうちどれかがトリガーされた場合に設定されたコールバックの実行準備が完了するようになっています。

以下に、上記の説明に関連するコードを抜粋します。

宣言部分

  // ~~~ 省略 ~~~

class ExecutorNotifyWaitable : public rclcpp::Waitable
{
public:
  // ~~~ 省略 ~~~

  RCLCPP_PUBLIC
  explicit ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback = {});

// ~~~ 省略 ~~~

  RCLCPP_PUBLIC
  bool
  is_ready(const rcl_wait_set_t & wait_set) override;

  RCLCPP_PUBLIC
  void
  execute(const std::shared_ptr<void> & data) override;

  RCLCPP_PUBLIC
  std::shared_ptr<void>
  take_data() override;

// ~~~ 省略 ~~~

private:
  std::function<void(void)> execute_callback_;

  std::mutex guard_condition_mutex_;

  std::function<void(size_t)> on_ready_callback_;

  std::set<rclcpp::GuardCondition::WeakPtr,
    std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
};

--- rclcpp/executor_notify_waitable.hppより引用 コメント省略

実装部分

// ~~~ 省略 ~~~

ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
: execute_callback_(on_execute_callback)
{
}

// ~~~ 省略 ~~~

bool
ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
{
  std::lock_guard<std::mutex> lock(guard_condition_mutex_);

  bool any_ready = false;
  for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
    const auto * rcl_guard_condition = wait_set.guard_conditions[ii];

    if (nullptr == rcl_guard_condition) {
      continue;
    }
    for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
      auto guard_condition = weak_guard_condition.lock();
      if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
        any_ready = true;
        break;
      }
    }
  }
  return any_ready;
}

// ~~~ 省略 ~~~

void
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
{
  (void) data;
  this->execute_callback_();
}

std::shared_ptr<void>
ExecutorNotifyWaitable::take_data()
{
  return nullptr;
}

// ~~~ 省略 ~~~

--- rclcpp/executor_notify_waitable.cppより引用

上記コードのポイントについてまとめます。

  • コンストラクタで準備完了時に実行するコールバックを設定
  • execute()で👆で設定したコールバックを実行
  • take_data()では何もデータを取り出さない、コールバック実行時も引数は渡さない
  • is_ready()では、内部にもつGuardConditionを全て確認し、どれか1つ以上がトリガーされていたらtrueを返す

このクラスの構造について理解した所で、次はrclcpp::Executorの中での実際の使われ方について見ていきましょう。

// ~~~ 省略 ~~~

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
  interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
  context_(options.context),
  notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
      [this]() {
        this->entities_need_rebuild_.store(true);    // <=== コールバック設定
      })),
  entities_need_rebuild_(true),
  collector_(notify_waitable_),
  wait_set_({}, {}, {}, {}, {}, {}, options.context),
  current_notify_waitable_(notify_waitable_),
  impl_(std::make_unique<rclcpp::ExecutorImplementation>())
{
  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
    [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
      auto strong_gc = weak_gc.lock();
      if (strong_gc) {
        strong_gc->trigger();
      }
    });

  notify_waitable_->add_guard_condition(interrupt_guard_condition_);  // <=== GuardConditionの設定
  notify_waitable_->add_guard_condition(shutdown_guard_condition_);   // <=== GuardConditionの設定

  // we need to initially rebuild the collection,
  // so that the notify_waitable_ is added
  collect_entities();
}

// ~~~ 省略 ~~~

void
Executor::collect_entities()
{
// ~~~ 省略 ~~~
 // notify_waitable_をコピーして、実際に利用するcurrent_notify_waitable_を構築する
 if (notify_waitable_) {
    current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
      *notify_waitable_);
    auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
    collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
  }
// ~~~ 省略 ~~~
}

// ~~~ 省略 ~~~

void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
// ~~~ 省略 ~~~
    // entities_need_rebuild_がtrueの場合、collect_entitiesが呼ばれる
    if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
      this->collect_entities();
    }

// ~~~ 省略 ~~~

  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
    RCUTILS_LOG_WARN_NAMED(
      "rclcpp",
      "empty wait set received in wait(). This should never happen.");
  } else {    // 準備完了の確認とコールバックの実行
    if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
      auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
      if (current_notify_waitable_->is_ready(rcl_wait_set)) {
        current_notify_waitable_->execute(current_notify_waitable_->take_data());
      }
    }
  }
}

---rclcpp/executor.cppより引用 日本語コメントは筆者

前記事MultiThreadedExecutorの項目で紹介したように、rclcpp::Executornotify_waitable_current_notify_waitable_という2つをメンバ変数として持っています。これらのうち、前者はオリジナルとしてずっと保持され、後者は毎回collect_entities()の最初の方で前者をコピーして再構築されます。実際にwait_set_に追加されて待たれているのは後者です。そして、notify_waitablerclcpp::Executor::Executor()内で、利用するGuardConditionとコールバックが設定されます。いずれかのGuardConditionがトリガーされた場合、wait_for_work()の最後でコールバックが実行されます。このコールバックが実行されると、wait_for_work()の最初にあるcollect_entities()が呼ばれるようになります。

つまり、このnotify_waitableは準備完了するとcollect_entities()を実行する効果があります

ここまでで、ExecutorNotifyWaitableの利用方法について理解できたかと思います。次は、notify_waitable_に設定された2つのGuardConditionについて紹介します。

interrupt_guard_condition

このGuardConditionがトリガーされるのは、以下の場面です。ちなみに、名前にinterruptと入っていますが、ユーザー割り込み等を扱う訳では無いので注意して下さい。

  1. MultiThreadedExecutorがMutuallyExclusiveなグループのコールバックを実行した後
  2. Executor::cancel()が実行された時
  3. add_node()add_callback_groupremove_noderemove_callback_groupnotify = trueで実行された時
  4. notify_trueであるかに関わらず、collect_entities()は必ず呼ばれてエンティティの監視リストは必ず更新される。

ちなみに、上記の状況でトリガーする理由は、それぞれ以下の通りだと思われます。

  1. 監視から外されている可能性があるMutuallyExclusiveなグループを監視下に戻す
  2. 今行っているrcl_wait()をなるべく早く解除し、処理の終了をする
  3. 今行っているrcl_wait()をなるべく早く解除し、エグゼキューターの監視リストにエンティティを追加する

shutdown_guard_condition

このGuardConditionがトリガーされるのは、ユーザー割り込みをハンドルしてノードのシャットダウンを行う時です。

rclcpp::Executor::Executor()内で、以下のようにシャットダウン発生時のコールバックが設定されています。そのコールバックにてトリガーされています。

Executor::Executor(const rclcpp::ExecutorOptions & options)
// ~~~ 省略 ~~~
{
  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
    [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
      auto strong_gc = weak_gc.lock();
      if (strong_gc) {
        strong_gc->trigger();
      }
    });
// ~~~ 省略 ~~~
}

余談 ユーザー割り込みの処理方法

上記を紹介するにあたって、ノードのユーザー割り込みの処理とシャットダウンの方法について紹介します。

以下に、rclcpp::init()の宣言・実装と関連するコードを掲載します。

/// Option to indicate which signal handlers rclcpp should install.
enum class SignalHandlerOptions
{
  /// Install both sigint and sigterm, this is the default behavior.
  All,
  /// Install only a sigint handler.
  SigInt,
  /// Install only a sigterm handler.
  SigTerm,
  /// Do not install any signal handler.
  None,
};

RCLCPP_PUBLIC
void
init(
  int argc,
  char const * const * argv,
  const InitOptions & init_options = InitOptions(),
  SignalHandlerOptions signal_handler_options = SignalHandlerOptions::All);

void
init(
  int argc,
  char const * const * argv,
  const InitOptions & init_options,
  SignalHandlerOptions signal_handler_options)
{
  using rclcpp::contexts::get_global_default_context;
  get_global_default_context()->init(argc, argv, init_options);
  // Install the signal handlers.
  install_signal_handlers(signal_handler_options);
}

bool
install_signal_handlers(SignalHandlerOptions signal_handler_options)
{
  return SignalHandler::get_global_signal_handler().install(signal_handler_options);
}

---rclcpp/utilities.cppとutilities.hppより引用

rclcpp::init()を呼ぶと、内部ではまずグローバルデフォルトコンテキストが初期化されます。これは、特に設定をしない場合に自動で利用されるシングルトンのコンテキストです。また、ユーザー割り込みを扱うために、グローバルなシグナルハンドラが内部に存在します。これはシングルトンになっており、SignalHandler::get_global_signal_handler()等で直接アクセスできます。 ここでは、グローバルシグナルハンドラに対してどの割り込みをハンドルするかの設定をしています。

install_signal_handlers()の中で呼ばれるSignalHandler::install()では、sigactionstd::signalを利用して、実際にシグナルハンドラが設定されるほか、シグナルをキャッチした際の処理のためのスレッドも用意されます。そのスレッドでは、シグナルをキャッチするとコンテキストのshutdown()が呼ばれるようになっています。(以下コード)

void
SignalHandler::deferred_signal_handler()
{
  while (true) {
    if (signal_received_.exchange(false)) {
      RCLCPP_DEBUG(get_logger(), "deferred_signal_handler(): shutting down");
      for (auto context_ptr : rclcpp::get_contexts()) {
        if (context_ptr->get_init_options().shutdown_on_signal) {
          RCLCPP_DEBUG(
            get_logger(),
            "deferred_signal_handler(): "
            "shutting down rclcpp::Context @ %p, because it had shutdown_on_signal == true",
            static_cast<void *>(context_ptr.get()));
          context_ptr->shutdown("signal handler");   // <=== ここでshutdown()
        }
      }
    }
    // ~~~ 省略 ~~~
  }
}

---rclcpp/signal_handler.cppより引用

上記の処理でshutdown()が実行された際に、エグゼキューターのコンストラクタ内でコンテキストに設定していた、シャットダウン時のコールバックが実行されます。つまり、shutdown_guard_condition_が呼ばれるのは、ユーザー割り込みのキャッチないしはrclcpp::shutdown()呼び出し等によってContext::shutdown()が実行された時です。 そして、ここでのshutdown_guard_condition_は、エグゼキューターが行っているwait()のブロッキングを解除し、エグゼキューターの処理を終了させる役割があります。ちなみに、このGuardConditionが無いとノードのシャットダウンをしているにも関わらず内部でwait()がずっと待ち続けてしまう事になります。

ブロッキングが解除された後は、各エグゼキューターのspin()の内部に存在するwhile()等が終了します。これは、コンテキストのシャットダウンが行われるとrclcpp::ok()がfalseを返すようになるからです。

SingleThrededExecutorの例

void
SingleThreadedExecutor::spin()
{
  // ~~~ 省略 ~~~
  while (rclcpp::ok(this->context_) && spinning.load()) { // <=== ここ
    rclcpp::AnyExecutable any_executable;
    if (get_next_executable(any_executable)) {
      execute_any_executable(any_executable);
    }
  }
}

---rclcpp/single_threaded_executor.cppより引用

Timerはどうやってwait()を起床させる?

前回記事などで、Waitableはrclcpp側で主に準備完了判定を行うと紹介しました。実はTimerもその状況に少し似ていて、必要機能のほとんどがrcl層に実装されています。これは、Timerの機能が通信機能とはの関わりが薄く、rmw側への問い合わせ等をせずに準備完了の判定が可能なためです。ただ、Waitableではrclcpp上の処理だけではrmw_waitでの待機が解除できないため、GuardConditonを使うなどして間接的にrmw側に働きかける工夫が必要でした。これはTimerでも同様で、rcl層内の機能だけではwait()を起床させられないので工夫が必要です。本章では、「Timerがどのようにwait()を起床させるのか」と、「Timerの待機完了判定はどのように行われているのか」の2点について紹介します。

実は、これら2つの処理に関する最も重要な処理はrcl_wait()関数の内部に集約されています。(もちろん他の様々な箇所にも処理はありますが) 以下にrcl_wait()関数内の該当部分を掲載します。細切れになると全体の流れがつかみにくいため、ここでは一連の流れをまとめて掲載します。各処理の説明をする際には対象のブロックを切り出して掲載するので、以下の全体のコードは個別説明を読んでから見て頂いても構いません(むしろそちらの方が理解しやすいかもしれません)。

次に説明するコードの全体

rcl_ret_t
rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
{
  // ~~~ 省略 ~~~

  // Calculate the timeout argument.
  // By default, set the timer to block indefinitely if none of the below conditions are met.
  rmw_time_t * timeout_argument = NULL;
  rmw_time_t temporary_timeout_storage;
  bool is_non_blocking = timeout == 0;

  // ~~~ 🔥ブロック 1 🔥  ~~~
  for (uint64_t t_idx = 0; t_idx < wait_set->impl->timer_index; ++t_idx) {
    if (!wait_set->timers[t_idx]) {
      continue;  // Skip NULL timers.
    }
    rmw_guard_conditions_t * rmw_gcs = &(wait_set->impl->rmw_guard_conditions);
    size_t gc_idx = wait_set->size_of_guard_conditions + t_idx;
    if (NULL != rmw_gcs->guard_conditions[gc_idx]) {
      // This timer has a guard condition, so move it to make a legal wait set.
      rmw_gcs->guard_conditions[rmw_gcs->guard_condition_count] =
        rmw_gcs->guard_conditions[gc_idx];
      ++(rmw_gcs->guard_condition_count);
    }
  }

  // ~~~ 🔥ブロック 2 🔥  ~~~

  int64_t min_next_call_time[RCL_STEADY_TIME + 1];
  rcl_clock_t * clocks[RCL_STEADY_TIME + 1] = {NULL, NULL, NULL, NULL};

  // ~~~ 省略 ~~~

  min_next_call_time[RCL_ROS_TIME] = INT64_MAX;
  min_next_call_time[RCL_SYSTEM_TIME] = INT64_MAX;
  min_next_call_time[RCL_STEADY_TIME] = INT64_MAX;

  if (!is_non_blocking) {
    for (size_t t_idx = 0; t_idx < wait_set->impl->timer_index; ++t_idx) {
      if (!wait_set->timers[t_idx]) {
        continue;  // Skip NULL timers.
      }

      rcl_clock_t * clock;
      if (rcl_timer_clock(wait_set->timers[t_idx], &clock) != RCL_RET_OK) {
        // should never happen
        return RCL_RET_ERROR;
      }

      // ~~~ 省略 ~~~

      // get the time of the next call to the timer
      int64_t next_call_time = INT64_MAX;
      // 対象のタイマーが次に待機完了になる時間を取得する
      rcl_ret_t ret = rcl_timer_get_next_call_time(wait_set->timers[t_idx], &next_call_time);
      if (ret == RCL_RET_TIMER_CANCELED) {
        wait_set->timers[t_idx] = NULL;
        continue;
      }
      if (ret != RCL_RET_OK) {
        return ret;  // The rcl error state should already be set.
      }
      // 監視対象となっているタイマーの中で一番早く時間切れになる時間を保存する
      if (next_call_time < min_next_call_time[clock->type]) {
        clocks[clock->type] = clock;
        min_next_call_time[clock->type] = next_call_time;
      }
    }
  }

  // ~~~ 🔥ブロック 3 🔥  ~~~

  if (is_non_blocking) {
    temporary_timeout_storage.sec = 0;
    temporary_timeout_storage.nsec = 0;
    timeout_argument = &temporary_timeout_storage;
  } else {
    bool has_valid_timeout = timeout > 0;
    int64_t min_timeout = has_valid_timeout ? timeout : INT64_MAX;

    // determine the min timeout of all clocks
    for (size_t i = RCL_ROS_TIME; i <= RCL_STEADY_TIME; i++) {
      if (clocks[i] == NULL) {
        continue;
      }

      int64_t cur_time;
      rmw_ret_t ret = rcl_clock_get_now(clocks[i], &cur_time);
      if (ret != RCL_RET_OK) {
        return ret;  // The rcl error state should already be set.
      }

      int64_t timer_timeout = min_next_call_time[i] - cur_time;

      if (timer_timeout <= min_timeout) {
        has_valid_timeout = true;
        min_timeout = timer_timeout;
      }
    }

    // If min_timeout was negative, we need to wake up immediately.
    if (min_timeout < 0) {
      min_timeout = 0;
    }
    if (has_valid_timeout) {
      temporary_timeout_storage.sec = RCL_NS_TO_S(min_timeout);
      temporary_timeout_storage.nsec = min_timeout % 1000000000;
      timeout_argument = &temporary_timeout_storage;
    }
  }

// ~~~ 🔥ブロック 4 🔥  ~~~

  // Wait.
  rmw_ret_t ret = rmw_wait(
    &wait_set->impl->rmw_subscriptions,
    &wait_set->impl->rmw_guard_conditions,
    &wait_set->impl->rmw_services,
    &wait_set->impl->rmw_clients,
    &wait_set->impl->rmw_events,
    wait_set->impl->rmw_wait_set,
    timeout_argument);

// ~~~ 🔥ブロック 5 🔥  ~~~

  // Items that are not ready will have been set to NULL by rmw_wait.
  // We now update our handles accordingly.

  bool any_timer_is_ready = false;

  // Check for ready timers
  // and set not ready timers (which includes canceled timers) to NULL.
  size_t i;
  for (i = 0; i < wait_set->impl->timer_index; ++i) {
    if (!wait_set->timers[i]) {
      continue;
    }

    bool current_timer_is_ready = false;
    rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &current_timer_is_ready);
    if (ret != RCL_RET_OK) {
      return ret;  // The rcl error state should already be set.
    }
    if (!current_timer_is_ready) {
      wait_set->timers[i] = NULL;
    } else {
      any_timer_is_ready = true;
    }
  }

  // ~~~ 省略 ~~~
}

Timerの待機完了判定の方法

まずはTimerの待機完了判定を行っている箇所を見てみましょう。 Timerが待機完了しているかどうかについては、様々な箇所でチェックが行われますが、最も重要な処理はrcl_wait()関数内の次のブロックで行われているものです。

// ~~~ 🔥ブロック 4 🔥  ~~~

  // Wait.
  rmw_ret_t ret = rmw_wait(
    &wait_set->impl->rmw_subscriptions,
    &wait_set->impl->rmw_guard_conditions,
    &wait_set->impl->rmw_services,
    &wait_set->impl->rmw_clients,
    &wait_set->impl->rmw_events,
    wait_set->impl->rmw_wait_set,
    timeout_argument);

// ~~~ 🔥ブロック 5 🔥  ~~~

  // Items that are not ready will have been set to NULL by rmw_wait.
  // We now update our handles accordingly.

  bool any_timer_is_ready = false;

  // Check for ready timers
  // and set not ready timers (which includes canceled timers) to NULL.
  size_t i;
  for (i = 0; i < wait_set->impl->timer_index; ++i) {
    if (!wait_set->timers[i]) {
      continue;
    }

    bool current_timer_is_ready = false;
    // タイマーが準備完了したか確認する
    rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &current_timer_is_ready);
    if (ret != RCL_RET_OK) {
      return ret;  // The rcl error state should already be set.
    }
    if (!current_timer_is_ready) {
      wait_set->timers[i] = NULL;
    } else {
      any_timer_is_ready = true;
    }
  }

コード中「🔥ブロック 4 🔥」となっている箇所は、以前の記事でも何度か登場したrmw_wait()関数です。 これによって待機が実行されたあと、次の「🔥ブロック 5 🔥」内のrcl_timer_is_ready()によってタイマーが準備完了したかを判定します。この関数は、以下のように実装されています。

rcl_ret_t
rcl_timer_is_ready(const rcl_timer_t * timer, bool * is_ready)
{
  RCL_CHECK_ARGUMENT_FOR_NULL(timer, RCL_RET_INVALID_ARGUMENT);
  RCL_CHECK_ARGUMENT_FOR_NULL(timer->impl, RCL_RET_TIMER_INVALID);
  RCL_CHECK_ARGUMENT_FOR_NULL(is_ready, RCL_RET_INVALID_ARGUMENT);
  int64_t time_until_next_call;
  rcl_ret_t ret = rcl_timer_get_time_until_next_call(timer, &time_until_next_call);
  if (ret == RCL_RET_TIMER_CANCELED) {
    *is_ready = false;
    return RCL_RET_OK;
  } else if (ret != RCL_RET_OK) {
    return ret;  // rcl error state should already be set.
  }
  *is_ready = (time_until_next_call <= 0);
  return RCL_RET_OK;
}

これは非常に単純で、タイマーが次に準備完了となるまでの相対時間が、0以下であるかを確認しているだけです。これは、要するに現在時刻 > 時間切れ時刻となっているかを確認するという処理です。ROS2のタイマー機能は基本的に「設定した相対時間が経過する」(もしくは、「指定した絶対時間に達する」)以外の要素を準備完了条件に加える事はできないため、この処理だけで準備完了が判定できます。(当然、rmw層から何らかのフィードバックを貰う必要もありません。)

Timerにおけるwait()の起床方法

では、次にTimerにおけるwait()関数の起床方法について紹介します。まず、最初に結論から述べると、以下の2種類の方法によって起床を行います。

  1. rmw_wait()での待機を行う際に、wait_setが持つ中で最も期限切れが近いタイマーの相対待機時間に合わせて待機のタイムアウトを設定する
  2. Timerが持つGuardConditionをトリガーする

ブロッキングを伴う処理では定番ですが、rmw_wait()でも待機のタイムアウトを設定することが可能です。また、こちらも定番ですがタイムアウトに"-1"を設定することで無限に待機することもできます。wait_setが持つエンティティにタイマーが含まれていない場合、rmw_wait()のタイムアウトは無限に設定されます。しかし、タイマーがエンティティとして含まれている場合、rmw_wait()のタイムアウトは含まれるタイマーの中で最も期限切れに近いタイマーの相対待機時間に合わせてタイムアウトが設定されます。これはつまり、当該の待機の間に通信ミドルウェアの側で準備完了になったエンティティが1つもない場合でも、その時間に合わせてrmw_wait()が起床するということです。Timerの普段の起床はこの1.のタイムアウト設定によって実現されています。 では、2.のGuardConditionは何のために利用するのでしょうか?これは、ユーザーによるタイマーのキャンセルに対応して一度rmw_wait()を起床させるために存在します。この場合、rcl_timer_is_ready()は恐らくfalseを返すので、キャンセルしたタイマーのコールバック等は実行されないでしょう。他に準備完了したエンティティが無ければ、当該ループではコールバック等の実行は無いかもしれません。しかし、タイマーのキャンセルに合わせてwait_setの更新等が必要になるため、それらの処理を行うためにwait()を起床させる必要があります。

それでは、実際のコードを見ていきましょう。 以下のコードは、rcl_wait()内でTimerの処理を行う4つの主要なブロックから構成されています。順を追って各ブロックの役割を説明します。 なお、コードの全体は本章の最初に折りたたまれた形で掲載されています。

ブロック1 Timerが持つGuardConditionのwait_setへの追加

  // ~~~ 🔥ブロック 1 🔥  ~~~
  for (uint64_t t_idx = 0; t_idx < wait_set->impl->timer_index; ++t_idx) {
    if (!wait_set->timers[t_idx]) {
      continue;  // Skip NULL timers.
    }
    rmw_guard_conditions_t * rmw_gcs = &(wait_set->impl->rmw_guard_conditions);
    size_t gc_idx = wait_set->size_of_guard_conditions + t_idx;
    if (NULL != rmw_gcs->guard_conditions[gc_idx]) {
      // This timer has a guard condition, so move it to make a legal wait set.
      rmw_gcs->guard_conditions[rmw_gcs->guard_condition_count] =
        rmw_gcs->guard_conditions[gc_idx];
      ++(rmw_gcs->guard_condition_count);
    }
  }

ここでは、TimerがGuardConditionを持っていた場合、それをwait_setへ追加しています。このGuardConditionの利用方法については、前に説明した通りです。

ブロック2 タイマーの中で最も近い待機完了時間の探索

  // ~~~ 🔥ブロック 2 🔥  ~~~

  int64_t min_next_call_time[RCL_STEADY_TIME + 1];
  rcl_clock_t * clocks[RCL_STEADY_TIME + 1] = {NULL, NULL, NULL, NULL};

  // ~~~ 省略 ~~~

  min_next_call_time[RCL_ROS_TIME] = INT64_MAX;
  min_next_call_time[RCL_SYSTEM_TIME] = INT64_MAX;
  min_next_call_time[RCL_STEADY_TIME] = INT64_MAX;

  if (!is_non_blocking) {
    for (size_t t_idx = 0; t_idx < wait_set->impl->timer_index; ++t_idx) {
      if (!wait_set->timers[t_idx]) {
        continue;  // Skip NULL timers.
      }

      rcl_clock_t * clock;
      if (rcl_timer_clock(wait_set->timers[t_idx], &clock) != RCL_RET_OK) {
        // should never happen
        return RCL_RET_ERROR;
      }

      // ~~~ 省略 ~~~

      // get the time of the next call to the timer
      int64_t next_call_time = INT64_MAX;
      // 対象のタイマーが次に待機完了になる時間を取得する
      rcl_ret_t ret = rcl_timer_get_next_call_time(wait_set->timers[t_idx], &next_call_time);
      if (ret == RCL_RET_TIMER_CANCELED) {
        wait_set->timers[t_idx] = NULL;
        continue;
      }
      if (ret != RCL_RET_OK) {
        return ret;  // The rcl error state should already be set.
      }
      // 監視対象となっているタイマーの中で一番早く時間切れになる時間を保存する
      if (next_call_time < min_next_call_time[clock->type]) {
        clocks[clock->type] = clock;
        min_next_call_time[clock->type] = next_call_time;
      }
    }
  }

ここでは、対象のTimerの中で最も近い待機完了時間を探索しています。この処理内における、next_call_timeは絶対時間だと思われます。

ブロック3・4 最も短い相対待機時間の計算とタイムアウトの設定

  // ~~~ 🔥ブロック 3 🔥  ~~~

  if (is_non_blocking) {
    temporary_timeout_storage.sec = 0;
    temporary_timeout_storage.nsec = 0;
    timeout_argument = &temporary_timeout_storage;
  } else {
    bool has_valid_timeout = timeout > 0;
    int64_t min_timeout = has_valid_timeout ? timeout : INT64_MAX;

    // determine the min timeout of all clocks
    for (size_t i = RCL_ROS_TIME; i <= RCL_STEADY_TIME; i++) {
      if (clocks[i] == NULL) {
        continue;
      }

      int64_t cur_time;
      rmw_ret_t ret = rcl_clock_get_now(clocks[i], &cur_time);
      if (ret != RCL_RET_OK) {
        return ret;  // The rcl error state should already be set.
      }

      int64_t timer_timeout = min_next_call_time[i] - cur_time;

      if (timer_timeout <= min_timeout) {
        has_valid_timeout = true;
        min_timeout = timer_timeout;
      }
    }

    // If min_timeout was negative, we need to wake up immediately.
    if (min_timeout < 0) {
      min_timeout = 0;
    }
    if (has_valid_timeout) {
      temporary_timeout_storage.sec = RCL_NS_TO_S(min_timeout);
      temporary_timeout_storage.nsec = min_timeout % 1000000000;
      timeout_argument = &temporary_timeout_storage;
    }
  }
  // ~~~ 🔥ブロック 4 🔥  ~~~

  // Wait.
  rmw_ret_t ret = rmw_wait(
    &wait_set->impl->rmw_subscriptions,
    &wait_set->impl->rmw_guard_conditions,
    &wait_set->impl->rmw_services,
    &wait_set->impl->rmw_clients,
    &wait_set->impl->rmw_events,
    wait_set->impl->rmw_wait_set,
    timeout_argument);

本ブロックでは、ブロック3で取得した最も近い絶対待機時間と今取得した現在時刻を比較し、rmw_wait()に設定するべきタイムアウト時間を計算しています。処理を見ると分かりますが、もしも計算したタイムアウトがマイナスの値になった場合は0として設定され、rmw_wait()はノンブロッキングで実行されます。(待機せず即座に戻るということです)

まとめ

Timerの、「wait()の起床方法」「準備完了判定の方法」についての説明は以上です。rclcpp側のTimerの実装などは今回は紹介しませんでしたが、重要なメカニズムについては理解できたのでは無いかと思います。ちなみに、rclcpp側のTimer実装は基本的にはラッパーであって、そこまで重要な処理が実装されている訳ではないので、比較的読みやすいかと思います。

おわりに

今回は、今までの記事で紹介できなかった、Waitableの詳細やTimerによるwaitの起床方法についてなどを紹介しました。次回の記事では、Waitableのより応用的な実装であるActionの実装について紹介します。 ActionはGuardConditionを利用しない設計になっており、そちらも読んで頂くとよりWaitableに関する理解が深まるかと思います。 次回はいよいよ本連載最後の記事となります。Actionの実装と、EventsExecutorについて紹介する予定です。