詳解 ROS2 エグゼキューター (Executor) - 4 -

執筆者:千葉工業大学 先進工学研究科 未来ロボティクス専攻 井上 叡 (インターン生)
監修者:稲葉 貴昭・高橋 浩和

※ 「ROS2」連載記事一覧はこちら


はじめに

本連載では、ROS2のC++ APIを利用しているほとんどの方が利用しているにも関わらず、その仕組みや仕様についてあまり知られていない、エグゼキューター(Executor)を紹介します。rclcppライブラリに実装されたエグゼキューターについてソースコードなどを追いながら、その仕組みについて詳しく追っていければと思います。 4回目となる今回も、1、2回目の記事で触れられなかったエグゼキューターに関する少し高度な概念や細かい内容について引き続き紹介していきます。本記事は前回記事とは違い、EventsExecutorの章の内容がActionの章の内容と関わりがありますので、順番に読んで頂く方が良いかもしれません。

なお、本連載ではROS2 jazzyでの各APIについて解説していきます。各ライブラリのバージョンは、rcl 9.2.5rclcpp 28.1.9rmw 7.3.2rclcpputils 2.11.2です。 以下にそれぞれのリポジトリへのリンクを掲載します。ソースコードを参照する際はクローンすると便利です。 また、本連載で掲載する各ライブラリのコードは全てこれらのリポジトリより引用します。

EventExecutor

本連載最初の記事で、エグゼキューターにはMultiThreadedExecutorとSingleThreadedExecutorの2つ以外にも実験的機能として実装されているものが1つあると紹介しました。今回紹介するのが、そのエグゼキューター rclcpp::experimental::executors::EventsExecutorです。

このEventsExecutorは、既存の2種に比べて、実行中の監視リストの更新処理等をなるべく減らして実行効率を向上させることを目標に実装されています。ですので、rclcpp::experimental::executors::EventsExecutorクラスは他と同じくrclcpp::Executorを継承してはいるものの、コールバックの実行方法とスケジューリング方法はかなり変更されています。

既存のエグゼキューター機能を振り返る

既存の2種のエグゼキューターでは、以下の2つがコアとなる機能でした。

  • wait_setを利用したリアクターによる待機機構
  • ExecutorEntitiesCollectionを利用した監視対象エンティティの管理とwait_set構築

処理の大枠の流れとしては、次のようなものでした。

  1. ExecutorEntitiesCollectionを利用してWaitSetを構築
  2. そのWaitSetでエンティティの準備完了を待機
  3. 準備完了したエンティティを取り出し、そのリストが全て無くなるまで実行

しかし、1.でのWaitSetを構築する際に、ExecutorEntitiesCollectionを再構築する処理が比較的頻繁に(特にMultiThreadedで顕著)行われてしまいますし、3.の処理においてもget_next_ready_executable()で毎回WaitResultを全て走査していたりと、所々で無駄な処理が目立つ実装になってしまっていました。

EventsExecutorのコア機能

それに対して、EventsExecutorでは、主に以下の3つの機能が中心となって処理を行います。

  • イベントキュー (Events queue)
  • タイマーマネージャ (Timer manager)
  • エンティティコレクション (ExecutorEntitiesCollection)

各機能について紹介をする前にいきなりですが、EventsExecutor::spin()の実装を見てみましょう。

void
EventsExecutor::spin()
{
  if (spinning.exchange(true)) {
    throw std::runtime_error("spin() called while already spinning");
  }
  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

  timers_manager_->start();
  RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );

  while (rclcpp::ok(context_) && spinning.load()) {
    // Wait until we get an event
    ExecutorEvent event;
    bool has_event = events_queue_->dequeue(event);
    if (has_event) {
      this->execute_event(event);
    }
  }
}

---rclcpp/experimental/executors/events_executor.cppより引用

処理のループで行っている事は非常に単純で、イベントキューからデキューしたイベントを実行しているだけです。このコード断片だけで構造の大枠を掴める方もいらっしゃるかと思いますが、説明します。

このイベントキューから取り出されている「イベント」というのは、「エンティティの準備完了」の事を指します。つまり、TimerやSubscription、Serviceなどが準備完了状態になった事を示しています。

(※ ちなみに、以前にも紹介したようにProactorパターンでは、「何らかの非同期処理の準備完了」をイベントとします。ですので、この「エンティティの準備完了」はまさしくProactorパターンにおけるイベントそのものです。)

このイベントの種類は、以下の通りです。

enum ExecutorEventType
{
  CLIENT_EVENT,
  SUBSCRIPTION_EVENT,
  SERVICE_EVENT,
  TIMER_EVENT,
  WAITABLE_EVENT
};

---rclcpp/experimental/executors/events_executor_event_types.hppより引用 (※ もうお分かりかとは思いますが、ActionはWAITABLE_EVENTに含まれています)

つまり、EventsExecutorのメインスレッドはイベントキューからdequeueした準備完了エンティティのコールバックを順番に実行します

そして、イベントのエンキューは以下の場所で行われます。

  1. Subscription、Service client/server
  2. rmw側のスレッド (利用するミドルウェアの実装による)
  3. Waitable
  4. 実装依存。
    • GuardConditionによる実装の場合、それをトリガーしたスレッド
    • Action等の場合、つまりSubscriptionやServiceを使った実装の場合、1.と同じ
  5. Timer
  6. Timer managerが持つスレッド

1.に関しては、rmw側の実装依存になるため、実際にどのスレッドで実行されるかは利用するミドルウェアによります。しかし、基本的にはミドルウェア側で複数動いているスレッドのどれかで実行されるだろうと思います。

Timer managerはタイマーの準備完了を監視して、完了イベントをエンキューするために利用されます。これは独立したスレッドで動作し、内部ではrcl_wait()とほぼ同様の方法でタイマーをチェックします。

また、上記を見ると分かるように、キューへの操作は様々なスレッドから同時に行われる可能性があるため、enqueue/dequeue共にスレッドセーフな実装が必要です。そのような機能をもつキューのシンプルな実装として、現状ではrclcpp/experimental/executors/simple_events_queue.hppが提供されています。これはstd::queueに対してロックを取得してのエンキュー操作、条件変数による待機を実装したデキュー操作が実装されているものです。もちろん、ユーザーが独自に実装したキューを利用することも可能です。

エンティティコレクションが更新されるタイミング EventsExecutorでは、コレクションが更新されるタイミングは基本的に以下のタイミングです。

  • add/remove - node/callback_group された時
  • interrupt_guard_condition_shutdown_guard_condition_のどちらかがトリガーされた時
    • EventsExecutor内部からはinterrupt_guard_condition_をトリガーするタイミングは現状では無し

既存のエグゼキューターでは、コレクションを更新するためにinterrupt_guard_condition_をトリガーしますが、EventsExecutorでは直接更新関数を呼びだすことで、若干の効率化と実装の簡素化が行われています。 ちなみに、エンティティコレクションの更新操作は基底クラスで行っている方法とほぼ同じなので、詳細は省略します。

また、このエグゼキューターはエグゼキューター内部にはスケジューリングメカニズムは特に持っていません。しかし、スケジューリングはキューの側で実装する事が可能です。上で紹介したsimple_events_queue.hppをキューとして利用した場合は、単にイベントが完了した順番に実行されるだけですが、何らかの指標による優先度付キューなどを実装してそれをキューとして利用させれば、スケジューリングさせることが可能です。スケジューリングメカニズムを外部から注入できるようになっているのが面白い点です

特徴のまとめ

これまでに紹介してきたEventsExecutorの特徴をまとめると以下のようになります。

  • エンティティの準備完了をイベントとして扱う
  • イベントが発生する毎にキューに入れられ、メインスレッドではそのイベントを1つずつ取り出して実行している
  • イベントのエンキューは、それぞれのイベントが発生したスレッドで行われる。既存のエグゼキューターのようにリアクターでまとめて取り出される形ではない
  • rcl_wait()のようなリアクターは利用しない
  • タイマー監視・管理を専用のスレッドで行うTimer managerクラスを持つ
  • スケジューリングメカニズムはキューの実装に左右される。優先度付きキューなどを使えば、何らかのスケジューリングを実装できる。

では、基本機能について把握した所で、実装を紹介します。

エンティティのエンキュー

本項ではエンティティのエンキュー方法について紹介します。

// ~~~ 省略 ~~~

EventsExecutor::EventsExecutor(
  rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
  bool execute_timers_separate_thread,
  const rclcpp::ExecutorOptions & options)
: rclcpp::Executor(options)
{
// ~~~ 省略 ~~~

if (!execute_timers_separate_thread) {
    timer_on_ready_cb =
      [this](const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data) {
        ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
        this->events_queue_->enqueue(event);
      };
  }
// ~~~ 省略 ~~~

}
// ~~~ 省略 ~~~

// Timer以外のエンティティのエンキュー設定s
void
EventsExecutor::refresh_current_collection(
  const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
{
  // Acquire lock before modifying the current collection
  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);

  current_entities_collection_->timers.update(
    new_collection.timers,
    [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
    [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->remove_timer(timer);});

  current_entities_collection_->subscriptions.update(
    new_collection.subscriptions,
    [this](auto subscription) {
      subscription->set_on_new_message_callback(
        this->create_entity_callback(
          subscription->get_subscription_handle().get(), ExecutorEventType::SUBSCRIPTION_EVENT));
    },
    [](auto subscription) {subscription->clear_on_new_message_callback();});

  current_entities_collection_->clients.update(
    new_collection.clients,
    [this](auto client) {
      client->set_on_new_response_callback(
        this->create_entity_callback(
          client->get_client_handle().get(), ExecutorEventType::CLIENT_EVENT));
    },
    [](auto client) {client->clear_on_new_response_callback();});

  current_entities_collection_->services.update(
    new_collection.services,
    [this](auto service) {
      service->set_on_new_request_callback(
        this->create_entity_callback(
          service->get_service_handle().get(), ExecutorEventType::SERVICE_EVENT));
    },
    [](auto service) {service->clear_on_new_request_callback();});

  // DO WE NEED THIS? WE ARE NOT DOING ANYTHING WITH GUARD CONDITIONS
  /*
  current_entities_collection_->guard_conditions.update(new_collection.guard_conditions,
    [](auto guard_condition) {(void)guard_condition;},
    [](auto guard_condition) {guard_condition->set_on_trigger_callback(nullptr);});
  */

  current_entities_collection_->waitables.update(
    new_collection.waitables,
    [this](auto waitable) {
      waitable->set_on_ready_callback(
        this->create_waitable_callback(waitable.get()));
    },
    [](auto waitable) {waitable->clear_on_ready_callback();});
}

std::function<void(size_t)>
EventsExecutor::create_entity_callback(
  void * entity_key, ExecutorEventType event_type)
{
  std::function<void(size_t)>
  callback = [this, entity_key, event_type](size_t num_events) {
      ExecutorEvent event = {entity_key, nullptr, -1, event_type, num_events};
      this->events_queue_->enqueue(event);
    };
  return callback;
}

std::function<void(size_t, int)>
EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
{
  std::function<void(size_t, int)>
  callback = [this, entity_key](size_t num_events, int waitable_data) {
      ExecutorEvent event =
      {entity_key, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
      this->events_queue_->enqueue(event);
    };
  return callback;
}

// ~~~ 省略 ~~~

---rclcpp/experimental/executors/events_executor.cppより引用

エンキューについては、Timer用の処理がコンストラクタ内で、それ以外はEventsExecutor::refresh_current_collection()で行われています。ここで、重要になるのが以下の関数です。

  • rclcpp::Timer::set_on_ready_callback()
  • rclcpp::Waitable::set_on_ready_callback()
  • rclcpp::ServiceBase::::set_on_new_request_callback()
  • rclcpp::ClientBase::::set_on_new_response_callback()
  • rclcpp::SubscriptionBase::::set_on_new_message_callback()

今までの記事ではあまり利用されている箇所が無く紹介してこなかったこれらのメソッドは、各エンティティの準備完了時に即座に呼び出されるコールバックを設定するものです。これは、普段エンティティに設定しているコールバックと異なり、準備完了したスレッドで直ちに実行される点が特徴です。

逆に、既存のエグゼキューターもこのエグゼキューターも、エンティティに設定した通常のコールバックは 準備完了になる ➡ メインのコールバック処理ループの中で実行 という順番で実行され、そこには少しの時間的隔たりがあります。また、実行されるスレッドもコールバック実行用のもので、準備完了したスレッドではありません。

上のメソッド群を使ってエンキューを行うコールバック関数を設定することで、イベント発生時のエンキューを実現しています。(上記コード内のcreate_entity_callback()を見ると分かりやすいです。)

コールバックの実行

void
EventsExecutor::execute_event(const ExecutorEvent & event)
{
  switch (event.type) {
    case ExecutorEventType::CLIENT_EVENT:
      {
        rclcpp::ClientBase::SharedPtr client;
        {
          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
          client = this->retrieve_entity(
            static_cast<const rcl_client_t *>(event.entity_key),
            current_entities_collection_->clients);
        }
        if (client) {
          for (size_t i = 0; i < event.num_events; i++) {
            execute_client(client);
          }
        }

        break;
      }
    case ExecutorEventType::SUBSCRIPTION_EVENT:
      {
        rclcpp::SubscriptionBase::SharedPtr subscription;
        {
          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
          subscription = this->retrieve_entity(
            static_cast<const rcl_subscription_t *>(event.entity_key),
            current_entities_collection_->subscriptions);
        }
        if (subscription) {
          for (size_t i = 0; i < event.num_events; i++) {
            execute_subscription(subscription);
          }
        }
        break;
      }
    case ExecutorEventType::SERVICE_EVENT:
      {
        rclcpp::ServiceBase::SharedPtr service;
        {
          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
          service = this->retrieve_entity(
            static_cast<const rcl_service_t *>(event.entity_key),
            current_entities_collection_->services);
        }
        if (service) {
          for (size_t i = 0; i < event.num_events; i++) {
            execute_service(service);
          }
        }

        break;
      }
    case ExecutorEventType::TIMER_EVENT:
      {
        timers_manager_->execute_ready_timer(
          static_cast<const rclcpp::TimerBase *>(event.entity_key), event.data);
        break;
      }
    case ExecutorEventType::WAITABLE_EVENT:
      {
        rclcpp::Waitable::SharedPtr waitable;
        {
          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
          waitable = this->retrieve_entity(
            static_cast<const rclcpp::Waitable *>(event.entity_key),
            current_entities_collection_->waitables);
        }
        if (waitable) {
          for (size_t i = 0; i < event.num_events; i++) {
            const auto data = waitable->take_data_by_entity_id(event.waitable_data);
            waitable->execute(data);
          }
        }
        break;
      }
  }
}

---rclcpp/experimental/executors/events_executor.cppより引用

コールバックの実行は、execute_event()というメソッドで実行します。実行は、既存の2種のエグゼキューターと同じですが、少し違う点もあります。まず、イベント毎にswitch caseで書かれているため、get_next_ready_executable()のように全てを走査して実行可能なものを探す必要が無くなっています。また、Timer以外で利用されてmいるretrieve_entity()std::mapから要素を取り出す操作で、以前のように線形探索ではなく対数オーダーで探索するため、こちらも改善されています。 ちなみに、このretrieve_entity()では、エンティティコレクションから直接要素を取り出すような形になっています。既存のエグゼキューターでは、エンティティコレクションを使ってWaitSetを構築、そしてその結果を線形探索してエンティティを取り出すといった形でいくつものインスタンスを経由していました。これらのインスタンス構築が無くなった点も、EventsExecutorの特徴の1つです。

ちなみに、Timerは設定によってはTimer managerが持つスレッドでコールバックを実行させることも可能です。その設定は、EventsExecutorのコンストラクタ引数のexecute_timers_separate_threadを通じて行います。その場合は、こちらのexecute_eventの中では実行されなくなります。

Timer managerの実装

Timer managerについても簡単に紹介します。この機能は、rclcpp::experimental::executors::TimersManagerクラスとして実装されています。このクラスは、メンバ変数として以下を持っています。

class TimersManager
{

  // ~~~ 省略 ~~~

  // Callback to be called when timer is ready
  std::function<void(const rclcpp::TimerBase *,
    const std::shared_ptr<void> &)> on_ready_callback_ = nullptr;

  // Thread used to run the timers execution task
  std::thread timers_thread_;
  // Protects access to timers
  std::mutex timers_mutex_;
  // Protects access to stop()
  std::mutex stop_mutex_;
  // Notifies the timers thread whenever timers are added/removed
  std::condition_variable timers_cv_;
  // Flag used as predicate by timers_cv_ that denotes one or more timers being added/removed
  bool timers_updated_ {false};
  // Indicates whether the timers thread is currently running or not
  std::atomic<bool> running_ {false};
  // Parent context used to understand if ROS is still active
  std::shared_ptr<rclcpp::Context> context_;
  // Timers heap storage with weak ownership
  WeakTimersHeap weak_timers_heap_;
};

---rclcpp/experimental/executors/timers_manager.hppより引用

重要なメンバ変数は以下の通りです。

  • on_ready_callback_
    • Timerが準備完了した時に呼び出されるコールバック。EventsExectorにエンキューするために利用。
    • EventsExecutorの設定で、execute_timers_separate_threadがtrueの場合は、これは設定されない。
  • timers_thread_
    • Timerの準備完了や変更などを監視するためのスレッド
  • timers_cv_
  • Timerの準備完了待機のために利用する条件変数
  • weak_timers_heap_
    • Timerの管理を行うためのヒープのコンテナ。監視対象のTimerへの弱参照を保持する。
    • このヒープは、期限切れが短い順番に二分木の形でTimerへの弱参照を保持します。

基本的に行うのは、Timerの準備完了を監視して、準備完了したTimerがあればon_ready_callback_を呼び出す or その場でコールバック実行という処理です。この一連の処理は、TimersManager::run_timers()に実装され、timers_thread_で実行されます。

void TimersManager::run_timers()
{
  // Make sure the running flag is set to false when we exit from this function
  // to allow restarting the timers thread.
  RCPPUTILS_SCOPE_EXIT(this->running_.store(false); );

  while (rclcpp::ok(context_) && running_) {
    // Lock mutex
    std::unique_lock<std::mutex> lock(timers_mutex_);

    std::optional<std::chrono::nanoseconds> time_to_sleep = get_head_timeout_unsafe();

    // If head timer was cancelled, try to reheap and get a new head.
    // This avoids an edge condition where head timer is cancelled, but other
    // valid timers remain in the heap.
    if (!time_to_sleep.has_value()) {
      // Re-heap to (possibly) move cancelled timer from head of heap. If
      // entire heap is cancelled, this will still result in a nullopt.
      TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
      locked_heap.heapify();
      weak_timers_heap_.store(locked_heap);
      time_to_sleep = get_head_timeout_unsafe();
    }

    // If no timers, or all timers cancelled, wait for an update.
    if (!time_to_sleep.has_value() || (time_to_sleep.value() == std::chrono::nanoseconds::max()) ) {
      // Wait until notification that timers have been updated
      timers_cv_.wait(lock, [this]() {return timers_updated_;});

      // Re-heap in case ordering changed due to a cancelled timer
      // re-activating.
      TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
      locked_heap.heapify();
      weak_timers_heap_.store(locked_heap);
    } else if (time_to_sleep.value() != std::chrono::nanoseconds::zero()) {
      // If time_to_sleep is zero, we immediately execute. Otherwise, wait
      // until timeout or notification that timers have been updated
      timers_cv_.wait_for(lock, time_to_sleep.value(), [this]() {return timers_updated_;});
    }

    // Reset timers updated flag
    timers_updated_ = false;

    // Execute timers
    this->execute_ready_timers_unsafe();
  }
}

---rclcpp/experimental/executors/timers_manager.cppより引用

ここで行っている事は、rcl_wait()内部でのTimerの確認処理とほぼ同一です。まず、get_head_timeout_unsafe()で、ヒープの先頭にあるTimerの相対待機時間、つまり最も早く期限切れになる時間を取得します。次に、その時間をタイムアウト時間として条件変数timers_cv_で待機を行います。ここでは、タイムアウトまでの待機のためにrmw_wait()の代わりに条件変数を利用している形です。また、監視対象のタイマーが全てキャンセルされていたり、そもそもタイマーが無い場合は無限の待機を行います。 条件変数での待機が終わったら、this->execute_ready_timers_unsafe()を実行して、タイマーが準備完了になっているかの確認と、完了していた場合にはエグゼキューターに通知するか、直接コールバックを実行します。

Timer managerの処理は基本的には上記が全てです。より詳しくは、実際のコードを見ていただければと思います。

まとめ

以上が、EventsExecutorの基本的な実装と機能の紹介です。これはjazzy時点では実験的な機能としての実装となっていますが、将来的には正式な機能になる可能性が高いです。特にSingleThreadedExecutorに対しては、こちらで代替可能でこちらの方が効率的なため、近いうちに置き換えられるのではないでしょうか。 ちなみに、EventsExecutorと既存エグゼキューターとのパフォーマンスの比較した調査があります。是非ご覧ください。

ActionはどのようにWaitableとして実装されている?

本項では、Actionがどのように実装されているのかについて紹介します。Actionの実装は複雑で、詳細な説明をしようとするとかなり長くなりそうです。そのため、本章ではどのようにレスポンスやリクエストを受け取り、それによってどのようにコールバックが実行されるか、の流れに絞って紹介します。

まず、Actionの通信がどのように行われているのかを少し振り返ってみましょう。

https://docs.ros.org/en/jazzy/_images/Action-SingleActionClient.gif https://docs.ros.org/en/jazzy/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Actions/Understanding-ROS2-Actions.html より引用

上記は、1つのActionについて、ServerとClientの通信を図に表したものです。ここで、内部的にServiceとPub/Sub通信が利用されているのが分かるでしょうか? ActionはWaitableのサブクラスとして実装されていて、内部的にはServiceやPub/Subを組み合わせて動作します。

Actionが内部で利用しているService、Pub/Sub通信はそれぞれ以下の機能を実現するために利用されます。

  • ゴールのリクエスト
    • Service通信
  • キャンセルのリクエスト
    • Service通信
  • 実行結果の通知
    • Service通信
  • 実行中のフィードバック
    • Pub/Sub通信
  • ステータスの発信
    • Pub/Sub通信
  • Expire Timer
    • Timer機能

Action Server/ClientはWaitableではあるものの、これまで紹介したいくつかのWaitableとは違い、GuardConditionは利用していません。Actionがrcl_wait等の待機を起床させる、もしくはEventsExecutorのイベントを発生させるには上記のどれかの機能が利用されます。

以降の項目では、これらの機能がServer/Clientの内部でどのように扱われ、エグゼキューターを通して実行されるのかを紹介していきます。

Action Server

まず、Action Serverから紹介していきます。

今まで見てきたエグゼキューターのコードとは少し毛色が違いますので、最初に基本的な情報をまとめておきます。

  • rclcpp_action配下にソースコードが存在する
    • それに伴い、namespaceはrclcpp_actionである
  • 実装はrclとrclcppの2つの層に渡って行われている
    • rclのソースはrcl_action配下に存在する
  • Action Serverのクラスであるrclcpp_action::Serverは以下のような継承関係を持つ
rclcpp::Waitable
      ↓
rclcpp_action::ServerBase
      ↓
rclcpp_action::Server
  • Waibtaleのサブクラスであるため、当然is_ready()execute()など動作に必要なメソッドはoverrideされている
  • Actionの機能の多くの部分はrcl層に実装されている
    • rclcpp層(rclcpp_action以下)に実装されているServerクラスはrclの機能のラッパーに近い

では、いよいよコードを見ていきましょう。 Action Serverは、Actionの機能を実現するためにそれぞれ以下を内部に持っています。

  • ゴールのリクエスト受信
    • Service Server
  • キャンセルのリクエスト受信
    • Service Server
  • 実行結果の返信
    • Service Server
  • 実行中のフィードバック
    • Publisher
  • ステータスの発信
    • Publisher
  • Expire Timer
    • Timer

これらは、全てrclcpp_action内で直接的に操作される事はありません。エグゼキューターは、rclcpp層に実装された機能であるため、それぞれのエンティティのrclcpp側に実装されたラッパークラス等をよく扱っていましたが、それとは異なります。Actionは基本的にrcl層に機能が実装されているため、ServerクラスとServerBaseクラスはrcl機能のラッパーとしての役割が強いです。そのため、上記のエンティティは直接的にはServerのどちらのクラスにも現れません。Serverのrcl側の実装rcl_action_server_t構造体のポインタを通してrclの関数によって操作されます。

Action Serverの構築

rclcpp_action::Serverクラスのコンストラクタから見ていきます。

  Server(
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
    rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
    rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
    const std::string & name,
    const rcl_action_server_options_t & options,
    GoalCallback handle_goal,
    CancelCallback handle_cancel,
    AcceptedCallback handle_accepted
  )
  : ServerBase(
      node_base,
      node_clock,
      node_logging,
      name,
      rosidl_typesupport_cpp::get_action_type_support_handle<ActionT>(),
      options),
    handle_goal_(handle_goal),
    handle_cancel_(handle_cancel),
    handle_accepted_(handle_accepted)
  {
  }

---rclcpp_action/server.hppより引用

Action構築時に渡す、Goal、Cancel、Acceptの3つのコールバックはそれぞれ以下のメンバ変数名でServerクラスが持つことが分かります。

  • handle_goal_
    • ゴール設定に対するコールバック
  • handle_cancel_
    • キャンセルに対するコールバック
  • handle_accepted_
    • アクセプトに対するコールバック

次に、基底クラス rclcpp_action::ServerBaseのコンストラクタを見ます。

ServerBase::ServerBase(
  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
  rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
  rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
  const std::string & name,
  const rosidl_action_type_support_t * type_support,
  const rcl_action_server_options_t & options
)
: pimpl_(new ServerBaseImpl(
      node_clock->get_clock(), node_logging->get_logger().get_child("rclcpp_action")))
{

  // ~~~ 省略 ~~~

  *(pimpl_->action_server_) = rcl_action_get_zero_initialized_server();

  rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
  rcl_clock_t * rcl_clock = pimpl_->clock_->get_clock_handle();

  // Action Serverの構築
  rcl_ret_t ret = rcl_action_server_init(
    pimpl_->action_server_.get(), rcl_node, rcl_clock, type_support, name.c_str(), &options);

  if (RCL_RET_OK != ret) {
    rclcpp::exceptions::throw_from_rcl_error(ret);
  }
  
  ret = rcl_action_server_wait_set_get_num_entities(
    pimpl_->action_server_.get(),
    &pimpl_->num_subscriptions_,
    &pimpl_->num_guard_conditions_,
    &pimpl_->num_timers_,
    &pimpl_->num_clients_,
    &pimpl_->num_services_);

  if (RCL_RET_OK != ret) {
    rclcpp::exceptions::throw_from_rcl_error(ret);
  }
}

---rclcpp_action/server.cppより引用

コンストラクタではrclのAPIを呼び出してAction Serverの構築処理を行っています。ここはほぼrclの関数呼び出しのみで構成されており、「rclcpp_actionはrclの機能のラッパーのようなもの」という主張が少し理解できるかと思います。

ここで重要なのが、rcl_action_server_init()です。この関数がAction Serverの初期化と構築を行います。

以下は、rcl_action_server_init()の処理の一部です。

rcl_ret_t
rcl_action_server_init(
  rcl_action_server_t * action_server,
  rcl_node_t * node,
  rcl_clock_t * clock,
  const rosidl_action_type_support_t * type_support,
  const char * action_name,
  const rcl_action_server_options_t * options)
{
  // ~~~ 省略 ~~~

  action_server->impl->goal_service = rcl_get_zero_initialized_service();
  action_server->impl->cancel_service = rcl_get_zero_initialized_service();
  action_server->impl->result_service = rcl_get_zero_initialized_service();
  action_server->impl->expire_timer = rcl_get_zero_initialized_timer();
  action_server->impl->feedback_publisher = rcl_get_zero_initialized_publisher();
  action_server->impl->status_publisher = rcl_get_zero_initialized_publisher();

  rcl_ret_t ret = RCL_RET_OK;
  // Initialize services
  SERVICE_INIT(goal);
  SERVICE_INIT(cancel);
  SERVICE_INIT(result);

  // Initialize publishers
  PUBLISHER_INIT(feedback);
  PUBLISHER_INIT(status);

  // Store reference to clock
  action_server->impl->clock = clock;

  // Initialize Timer
  ret = rcl_timer_init2(
    &action_server->impl->expire_timer, action_server->impl->clock, node->context,
    options->result_timeout.nanoseconds, NULL, allocator, true);
  if (RCL_RET_OK != ret) {
    goto fail;
  }

  // ~~~ 省略 ~~~
}

---rcl_action/action_server.cより引用

コード内、1つめの「省略」の下6行ほどで、implというメンバが持つエンティティの構造体をゼロ初期化しています。

このimplというメンバは以下のように定義される構造体です。

typedef struct rcl_action_server_impl_s
{
  rcl_service_t goal_service;
  rcl_service_t cancel_service;
  rcl_service_t result_service;
  rcl_publisher_t feedback_publisher;
  rcl_publisher_t status_publisher;
  rcl_timer_t expire_timer;
  char * action_name;
  rcl_action_server_options_t options;
  // Array of goal handles
  rcl_action_goal_handle_t ** goal_handles;
  size_t num_goal_handles;
  // Clock
  rcl_clock_t * clock;
  // Wait set records
  size_t wait_set_goal_service_index;
  size_t wait_set_cancel_service_index;
  size_t wait_set_result_service_index;
  size_t wait_set_expire_timer_index;
  rosidl_type_hash_t type_hash;
} rcl_action_server_impl_t;

--- rcl_action/action_server_impl.hより引用

Action Serverが持つエンティティがここに明記されています。これは、本項の最初に紹介したAction Serverが持つ6つのエンティティそのものです。

コンストラクタでは、これらをゼロ初期化したあと、SERVICE_INITPUBLISHER_INITの2つのマクロで構築を行います。

SERVICE_INITマクロ内部では、Action名から各Service名の構築が行われ、その名前と関連するmsg型、qos設定等からrcl_service_init()を使ってServiceが初期化されます。

PUBLISHER_INITマクロでも同様で、まずAction名からトピック名が構築され、その名前と関連するmsg型等からrcl_publisher_init()を使ってPublisherが構築されます。

トピック名やサービス名はrcl_action/names.hにて宣言されたrcl_action_get_○○_service_name()rcl_action_get_○○_topic_name()を使って取得します。ここで、○○は"goal"や"feedback"などそれぞれの操作名が入ります。

ここまで、ユーザーが渡したGoal,Cancel,Acceptのハンドラは最上層のServerクラスが持つこと、実際のAction Serverはrcl層で構築されることを見てきました。 次は、Action Serverがエグゼキューターを通してどのように実行されるかを見ていきます。

ExecutorによるAction Serverの実行

まず、エグゼキューターでの実行に当たってSingleThreadedExecutorとMultiThreadedExecutorの場合はWaitSetへの追加、EventsExecutorの場合はon_ready_callbackの設定が必要になります。

WaitSetへの追加には、Waitableが持つインターフェースであるServerBase::add_to_wait_set()を利用します。これは内部で、rcl_action_wait_set_add_action_server()を呼び出してServerが持つService3つとTimerをWaitSetに追加します。

on_ready_callbackの追加も、WaitableのインターフェースServerBase::set_on_ready_callback()を利用します。これを利用すると、内部の3つのServiceに対して、新しいリクエストを受け取った時に呼ばれるコールバックが設定できます。こちらの場合だけ、Timer監視専用のスレッドも立ち上がります。このスレッドは、Actionが内部に持つExpire Timerを監視して準備完了の際に通知する役割を果たします。これは恐らく、EventsExecutorのTimers Managerに今の所WaitableからTimerを上手く取り出す機能が無いからだと思われます。将来的には何らかの形で改善されるのではないでしょうか。

エグゼキューターの待機機構へのエンティティの追加方法が分かったため、次は準備完了の判定について確認しましょう。

Waitableは、以下のコードの通り、Single/MultiThreadedExecutorとEventsExecutor共にexecute()インターフェースの呼び出しによって実行されます。

void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
  // ~~~ 省略 ~~~
  if (any_exec.waitable) {
    const std::shared_ptr<void> & const_data = any_exec.data;
    any_exec.waitable->execute(const_data);
  }
  // ~~~ 省略 ~~~
}

--- rclcpp/executor.cppより引用

void
EventsExecutor::execute_event(const ExecutorEvent & event)
{

  // ~~~ 省略 ~~~

    case ExecutorEventType::WAITABLE_EVENT:
      {
        rclcpp::Waitable::SharedPtr waitable;
        {
          std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
          waitable = this->retrieve_entity(
            static_cast<const rclcpp::Waitable *>(event.entity_key),
            current_entities_collection_->waitables);
        }
        if (waitable) {
          for (size_t i = 0; i < event.num_events; i++) {
            const auto data = waitable->take_data_by_entity_id(event.waitable_data);
            waitable->execute(data);
          }
        }
        break;
      }
  // ~~~ 省略 ~~~

}

--- rclcpp/experimental/executors/events_executor.cppより引用

ServerBase::execute()では、受け取ったデータの型によって以下3つの処理が実行されます。 これらの3つの処理では、基本的に受け取ったデータへの前処理を行った後にユーザーが設定したハンドラが実行されて、そのハンドラの結果を利用して返信(レスポンス)するという形になっています。詳しくは、それぞれの関数の実装を覗いてみて下さい。

void
ServerBase::execute(const std::shared_ptr<void> & data_in)
{
  if (!data_in) {
    throw std::runtime_error("ServerBase::execute: give data pointer was null");
  }

  std::shared_ptr<ServerBaseData> data_ptr = std::static_pointer_cast<ServerBaseData>(data_in);

  std::visit(
    [&](auto && data) -> void {
      using T = std::decay_t<decltype(data)>;
      if constexpr (std::is_same_v<T, ServerBaseData::GoalRequestData>) {
        execute_goal_request_received(
          std::get<0>(data), std::get<1>(data), std::get<2>(data),
          std::get<3>(data));
      }
      if constexpr (std::is_same_v<T, ServerBaseData::CancelRequestData>) {
        execute_cancel_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data));
      }
      if constexpr (std::is_same_v<T, ServerBaseData::ResultRequestData>) {
        execute_result_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data));
      }
      if constexpr (std::is_same_v<T, ServerBaseData::GoalExpiredData>) {
        execute_check_expired_goals();
      }
    },
    data_ptr->data);
}

上記のメソッドでは受け取ったデータ(msg)の種類によって実行する処理を変えている訳ですが、このデータの取得方法はWaitSetとon_ready_callbackのどちらで実行するかによって少し異なります。

Single/MultiThreadedExecutorの場合

こちらは、以下のような流れによって最終的にexecute()が実行されます。

  1. Executor::get_next_ready_executable()内部でのWaitResult::next_ready_waitable呼び出しにより、ServerBase::is_ready()が実行される
  2. ServerBase::is_ready()の実行により、wait_setからreadyになったエンティティの中に3種のサービスのどれかがあるかが確認される。確認された場合、Action内部のnext_ready_event変数にどのサービスだったかの情報が保存されてその関数はtrueを返す
  3. Executor::get_next_ready_executable()内で、ServerBase::take_data()が呼ばれる
  4. take_data()は、next_ready_eventを使ってServerBase::take_data_by_entity_idを実行し、関連するデータを返す
  5. 4.で取り出したデータを使ってexecute()を実行する

こちらの場合は、サービスが準備完了してからWaitSetでチェックされるまでに時間的隔たりがあります。なので、サービスと関連するデータを2.3.4の手順で再度関連付ける必要があります。

EventsExecutorの場合 上記に対して、こちらは単純です。こちらは、Eventにデータが直接的に紐づいていてそれを取り出すことが可能なため、以下のような手順になります。

  1. on_ready_callbackの実行によりキューにActionがWaitableイベントとして追加される
  2. Eventが持っているエンティティidを使って、直接ServerBase::take_data_by_entity_idを呼び出す。
  3. 2.で取り出してデータを使ってexecute()

上記に関連するAction側のコードを以下に掲載しますので、説明と見比べてみて下さい。

bool
ServerBase::is_ready(const rcl_wait_set_t & wait_set)
{
  bool goal_request_ready;
  bool cancel_request_ready;
  bool result_request_ready;
  bool goal_expired;
  rcl_ret_t ret;
  {
    std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
    ret = rcl_action_server_wait_set_get_entities_ready(
      &wait_set,
      pimpl_->action_server_.get(),
      &goal_request_ready,
      &cancel_request_ready,
      &result_request_ready,
      &goal_expired);
  }

  if (RCL_RET_OK != ret) {
    rclcpp::exceptions::throw_from_rcl_error(ret);
  }

  pimpl_->next_ready_event = std::numeric_limits<uint32_t>::max();

  if (goal_request_ready) {
    pimpl_->next_ready_event = static_cast<uint32_t>(EntityType::GoalService);
    return true;
  }

  if (cancel_request_ready) {
    pimpl_->next_ready_event = static_cast<uint32_t>(EntityType::CancelService);
    return true;
  }

  if (result_request_ready) {
    pimpl_->next_ready_event = static_cast<uint32_t>(EntityType::ResultService);
    return true;
  }

  if (goal_expired) {
    pimpl_->next_ready_event = static_cast<uint32_t>(EntityType::Expired);
    return true;
  }

  return false;
}

std::shared_ptr<void>
ServerBase::take_data()
{
  size_t next_ready_event = pimpl_->next_ready_event.exchange(std::numeric_limits<uint32_t>::max());

  if (next_ready_event == std::numeric_limits<uint32_t>::max()) {
    throw std::runtime_error("ServerBase::take_data() called but no data is ready");
  }

  return take_data_by_entity_id(next_ready_event);
}

std::shared_ptr<void>
ServerBase::take_data_by_entity_id(size_t id)
{
  std::shared_ptr<ServerBaseData> data_ptr;
  // Mark as ready the entity from which we want to take data
  switch (static_cast<EntityType>(id)) {
    case EntityType::GoalService:
      {
        rcl_ret_t ret;
        rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
        rmw_request_id_t request_header;

        std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

        std::shared_ptr<void> message = create_goal_request();
        ret = rcl_action_take_goal_request(
          pimpl_->action_server_.get(),
          &request_header,
          message.get());

        data_ptr = std::make_shared<ServerBaseData>(
          ServerBaseData::GoalRequestData(ret, goal_info, request_header, message));
      }
      break;
    case EntityType::ResultService:
      {
        rcl_ret_t ret;
        // Get the result request message
        rmw_request_id_t request_header;
        std::shared_ptr<void> result_request = create_result_request();
        std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
        ret = rcl_action_take_result_request(
          pimpl_->action_server_.get(), &request_header, result_request.get());

        data_ptr =
          std::make_shared<ServerBaseData>(
          ServerBaseData::ResultRequestData(ret, result_request, request_header));
      }
      break;
    case EntityType::CancelService:
      {
        rcl_ret_t ret;
        rmw_request_id_t request_header;

        // Initialize cancel request
        auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>();

        std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
        ret = rcl_action_take_cancel_request(
          pimpl_->action_server_.get(),
          &request_header,
          request.get());

        data_ptr =
          std::make_shared<ServerBaseData>(
          ServerBaseData::CancelRequestData(ret, request, request_header));
      }
      break;
    case EntityType::Expired:
      {
        data_ptr =
          std::make_shared<ServerBaseData>(ServerBaseData::GoalExpiredData());
      }
      break;
  }

  return std::static_pointer_cast<void>(data_ptr);
}

---rclcpp_aciton/server.cppより引用

Timerに関しては、今まで紹介してきたTimerの実行のされ方とほぼ同じであるため解説は省略します。EventsExecutor用に専用スレッドを立てて実行する場合も、結局Timer managerの真似をするだけです。

以上がAction Serverの待機->実行までの流れです。内部に複数サービスを持っているせいでデータの扱いが少々面倒な事になっていましたが、基本的には普通のWaitableと似た流れで処理されます。

Action Client

Action ClientもServerと基本事項はほぼ変わりません。似た内容の箇所は簡潔に紹介します。 継承関係もほぼ同様です。

rclcpp::Waitable
      ↓
rclcpp_action::ClientBase
      ↓
rclcpp_action::Client

1つ違う点は、ユーザーから渡されるコールバックのタイミングです。Serverでは構築時に渡されていましたが、こちらはリクエストを送信する際に設定します。しかし、これはエグゼキューターによる実行にはあまり関わってこないので今回は省略します。

内部に持つエンティティは当然違います。こちらのimplは以下の通りです。

typedef struct rcl_action_client_impl_s
{
  rcl_client_t goal_client;
  rcl_client_t cancel_client;
  rcl_client_t result_client;
  rcl_subscription_t feedback_subscription;
  rcl_subscription_t status_subscription;
  rcl_action_client_options_t options;
  char * action_name;
  // Wait set records
  size_t wait_set_goal_client_index;
  size_t wait_set_cancel_client_index;
  size_t wait_set_result_client_index;
  size_t wait_set_feedback_subscription_index;
  size_t wait_set_status_subscription_index;
  rosidl_type_hash_t type_hash;
} rcl_action_client_impl_t;

---rcl_action/action_client_impl.hより引用

基本的にはServerと逆のエンティティを持ちます。

  • ゴールのリクエスト送信
    • Service Client
  • キャンセルのリクエスト送信
    • Service Client
  • 実行結果の受信
    • Service Client
  • 実行中のフィードバックの受信
    • Subscriber
  • ステータスの受信
    • Subscriber

ActionClientの構築

構築方法はほぼServerと同じです。ClientBaseのコンストラクタ内でrclのAPIを呼ぶことで構築します。rcl_action_client_init()を呼び、各エンティティを初期化します。

rcl_ret_t
rcl_action_client_init(
  rcl_action_client_t * action_client,
  rcl_node_t * node,
  const rosidl_action_type_support_t * type_support,
  const char * action_name,
  const rcl_action_client_options_t * options)
{
  // 省略
  // Initialize action service clients.
  CLIENT_INIT(goal);
  CLIENT_INIT(cancel);
  CLIENT_INIT(result);

  // Initialize action topic subscriptions.
  SUBSCRIPTION_INIT(feedback);
  SUBSCRIPTION_INIT(status);

  // 省略
}

---rclcpp/action_client.cより引用

こちらのマクロでも、内部でrcl_action/name.hにて宣言された関数によってサービス名を取得し、それとmsg型の情報を使って各ClientとSubscriptionを初期化しています。

ExecutorによるAction Clientの実行

Clientの場合もServerと同様にClientBase::add_to_wait_set()ClientBase::set_on_ready_callback()を利用することで待機機構へのエンティティの設定を行います。両者とも、内部的な処理がほぼ同一で少々関数名が違う程度であるため、解説は省略します。Serverの説明を読みながら、Clientのコードを読めば理解できるかと思われます。

実際の所、実行に関してもServerとほぼ変わりませんが、こちらはSubscriptonが2つあることで扱うエンティティの総数が増えているので少し見てみましょう。

void
ClientBase::execute(const std::shared_ptr<void> & data_in)
{
  if (!data_in) {
    throw std::invalid_argument("'data_in' is unexpectedly empty");
  }

  std::shared_ptr<ClientBaseData> data_ptr = std::static_pointer_cast<ClientBaseData>(data_in);

  std::visit(
    [&](auto && data) -> void {
      using T = std::decay_t<decltype(data)>;
      if constexpr (std::is_same_v<T, ClientBaseData::FeedbackReadyData>) {
        if (RCL_RET_OK == data.ret) {
          this->handle_feedback_message(data.feedback_message);
        } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
          rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback");
        }
      }
      if constexpr (std::is_same_v<T, ClientBaseData::StatusReadyData>) {
        if (RCL_RET_OK == data.ret) {
          this->handle_status_message(data.status_message);
        } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
          rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status");
        }
      }
      if constexpr (std::is_same_v<T, ClientBaseData::GoalResponseData>) {
        if (RCL_RET_OK == data.ret) {
          this->handle_goal_response(data.response_header, data.goal_response);
        } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
          rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response");
        }
      }
      if constexpr (std::is_same_v<T, ClientBaseData::ResultResponseData>) {
        if (RCL_RET_OK == data.ret) {
          this->handle_result_response(data.response_header, data.result_response);
        } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
          rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response");
        }
      }
      if constexpr (std::is_same_v<T, ClientBaseData::CancelResponseData>) {
        if (RCL_RET_OK == data.ret) {
          this->handle_cancel_response(data.response_header, data.cancel_response);
        } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
          rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response");
        }
      }
    }, data_ptr->data);
}

---rclcpp_action/client.cppより引用

上2つのif constexpr文の処理はSubscriptionに紐づいたコールバックの実行です。この処理が増えているものの、全体的にはServerと同じです。

Single/MultiThreadedExecutorの処理の流れも見ていきましょう。処理の流れはほぼ同様ですので、コードのみ紹介します。

bool
ClientBase::is_ready(const rcl_wait_set_t & wait_set)
{
  bool is_feedback_ready{false};
  bool is_status_ready{false};
  bool is_goal_response_ready{false};
  bool is_cancel_response_ready{false};
  bool is_result_response_ready{false};

  rcl_ret_t ret;
  {
    std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
    ret = rcl_action_client_wait_set_get_entities_ready(
      &wait_set, pimpl_->client_handle.get(),
      &is_feedback_ready,
      &is_status_ready,
      &is_goal_response_ready,
      &is_cancel_response_ready,
      &is_result_response_ready);
    if (RCL_RET_OK != ret) {
      rclcpp::exceptions::throw_from_rcl_error(
        ret, "failed to check for any ready entities");
    }
  }

  pimpl_->next_ready_event = std::numeric_limits<size_t>::max();

  if (is_feedback_ready) {
    pimpl_->next_ready_event = static_cast<size_t>(EntityType::FeedbackSubscription);
    return true;
  }

  if (is_status_ready) {
    pimpl_->next_ready_event = static_cast<size_t>(EntityType::StatusSubscription);
    return true;
  }

  if (is_goal_response_ready) {
    pimpl_->next_ready_event = static_cast<size_t>(EntityType::GoalClient);
    return true;
  }

  if (is_result_response_ready) {
    pimpl_->next_ready_event = static_cast<size_t>(EntityType::ResultClient);
    return true;
  }

  if (is_cancel_response_ready) {
    pimpl_->next_ready_event = static_cast<size_t>(EntityType::CancelClient);
    return true;
  }

  return false;
}

std::shared_ptr<void>
ClientBase::take_data_by_entity_id(size_t id)
{
  std::shared_ptr<ClientBaseData> data_ptr;
  rcl_ret_t ret;

  // Mark as ready the entity from which we want to take data
  switch (static_cast<EntityType>(id)) {
    case EntityType::GoalClient:
      {
        rmw_request_id_t response_header;
        std::shared_ptr<void> goal_response;
        {
          std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);

          goal_response = this->create_goal_response();
          ret = rcl_action_take_goal_response(
            pimpl_->client_handle.get(), &response_header, goal_response.get());
        }
        data_ptr = std::make_shared<ClientBaseData>(
          ClientBaseData::GoalResponseData(
            ret, response_header, goal_response));
      }
      break;
    case EntityType::ResultClient:
      {
        rmw_request_id_t response_header;
        std::shared_ptr<void> result_response;
        {
          std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
          result_response = this->create_result_response();
          ret = rcl_action_take_result_response(
            pimpl_->client_handle.get(), &response_header, result_response.get());
        }
        data_ptr =
          std::make_shared<ClientBaseData>(
          ClientBaseData::ResultResponseData(
            ret, response_header, result_response));
      }
      break;
    case EntityType::CancelClient:
      {
        rmw_request_id_t response_header;
        std::shared_ptr<void> cancel_response;
        {
          std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
          cancel_response = this->create_cancel_response();
          ret = rcl_action_take_cancel_response(
            pimpl_->client_handle.get(), &response_header, cancel_response.get());
        }
        data_ptr =
          std::make_shared<ClientBaseData>(
          ClientBaseData::CancelResponseData(
            ret, response_header, cancel_response));
      }
      break;
    case EntityType::FeedbackSubscription:
      {
        std::shared_ptr<void> feedback_message;
        {
          std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
          feedback_message = this->create_feedback_message();
          ret = rcl_action_take_feedback(
            pimpl_->client_handle.get(), feedback_message.get());
        }
        data_ptr =
          std::make_shared<ClientBaseData>(
          ClientBaseData::FeedbackReadyData(
            ret, feedback_message));
      }
      break;
    case EntityType::StatusSubscription:
      {
        std::shared_ptr<void> status_message;
        {
          std::lock_guard<std::recursive_mutex> lock(pimpl_->action_client_mutex_);
          status_message = this->create_status_message();
          ret = rcl_action_take_status(
            pimpl_->client_handle.get(), status_message.get());
        }
        data_ptr =
          std::make_shared<ClientBaseData>(
          ClientBaseData::StatusReadyData(
            ret, status_message));
      }
      break;
  }

  return std::static_pointer_cast<void>(data_ptr);
}

以上が、Action Clientによる待機->実行までの流れです。こちらはほぼServerと同じであったためかなり簡潔な説明となりました。

おわりに

今回は、今後重要になりそうなEventsExecutorと、ActionとExecutorの関係について紹介しました。普段使っているだけではほぼ意識することが無いActionの内部的な動作ですが、Executorでの扱われ方を通じて中身を見ると意外と理解できるかと思います。 またEventsExecutorに関しては、少しの書き換えだけでSingleThreadedExecutorから移行できますので、是非実際に利用してみて頂けると嬉しいです。 本連載は今回の記事で終了となります。本連載が、皆様のExecutorやそれに関連する機能への理解の助けになれば幸いです。