Asio (Boost.Asio) C++ライブラリ入門 4 - 完了トークン(CompletionToken)とasync_composeの利用法 -

執筆者:井上 叡
監修者:稲葉 貴昭・高橋 浩和

※ 「Asio (Boost.Asio) C++ライブラリ入門」連載記事一覧はこちら


はじめに

本記事では、非同期I/Oを扱うC++ライブラリであるAsio(またはBoost.Asio)ライブラリを紹介・解説します。Asio(またはBoost.Asio)ライブラリは、非同期I/Oを行う際に利用候補に挙がることがありますが、初めて使うと意外と仕組みが理解しづらく少し苦労したりします。そこで、本連載では、Asio(またはBoost.Asio)ライブラリが採用する非同期モデルやコア機能など、ライブラリの基礎となる部分を中心に解説していきます。解説の際にはなるべくサンプルコードを掲載し、具体例と共に理解していけたらと思います。最終的には公式リファレンスを読むことで、ある程度好きな機能を作れるようになることを目指します。

本記事で扱うライブラリのバージョンは、Boost 1.85とAsio1.30.2です。 C++20を標準としています。コンパイラはC++20に対応したものを利用してください。 本章以降で「Asio」と記載した際、その記述は「Boost.Asio」と「Asio」の両方について述べているものとします。また、Boost.Asioを利用している場合はasio::という名前空間はboost::asio::と読み替えて下さい。

4記事目となる本記事では、Asioライブラリを利用する上で重要なライブラリ機能、完了トークンとasync_composeについて紹介します。

完了トークン(Completion Tokens)と関連要素

本章では、完了トークン(Completion Tokens)とそれに関わりの深いいくつかの要素について紹介します。完了トークンはAsioに実装された非同期操作の開始関数(= async_○○)の最後の引数に設定されているものです。前記事までに登場したasync_acceptなどではこの引数にユーザー定義の完了ハンドラを渡していました。実は、完了トークンの役割はユーザーが直接完了ハンドラを設定するためだけにあるのではありません。

完了トークンを利用して、ユーザーは以下のように非同期操作の挙動を変更することができます。

  • 非同期操作にユーザー定義の完了ハンドラを設定する
  • 非同期エージェントに関連特性(エグゼキューター、アロケーター、キャンセルスロットなど)を設定する
  • ユーザー定義の完了ハンドラではなく、std::futurecoroutineなどのメカニズムを利用して操作結果を受け取るように設定する

非同期操作の開始関数と、他の要素との関わりを表した図を見てみましょう。

完了トークンと各要素の関わり

上図にて、開始関数から3本矢印が伸びているのが分かるでしょうか。例として以下のasio::ip::tcp::socket::async_read_some()のシグネチャ(※戻り値の型は省略)と図を比較しながら紹介していきます。

  template <typename MutableBufferSequence,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
        std::size_t)) ReadToken = default_completion_token_t<executor_type>>
  auto async_read_some(const MutableBufferSequence& buffers,
      ReadToken&& token = default_completion_token_t<executor_type>())

完了シグネチャ(Completion signature)

まず、一番左の矢印とその先、完了シグネチャ(Completion signature)から見ていきます。 完了シグネチャは、開始関数によって予め設定されています。これは、上記のコードだと以下の部分に当たります。

ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,std::size_t))

マクロの中の関数シグネチャの宣言のようなものを見てみると、void (asio::error_code,std::size_t)のようになっています。これは、ユーザーが完了ハンドラを自分で定義して結果を受け取る場合のシグネチャになります。このシグネチャを基本としてそれ以外の方法の結果型なども決定されます。

例えばstd::futureを使って結果を返すように設定した場合は、開始関数の戻り値がstd::future<size_t>、エラーが発生するとasio::error_codestd::system_errorになってfutureより伝播されるようになります。*1

完了トークン(Completion Tokens)

完了トークンは、開始関数から伸びた一番右の矢印です。上のasync_read_someの例では、ReadTokenという型になっている引数が完了トークンです。このシグネチャから分かるように、完了トークンは開始関数の引数として渡されることで設定されます。 前述したように、この関数の完了トークンとしてユーザー定義の完了ハンドラ以外にも色々なものを渡すことができます。以下に代表的な例を挙げます。

  1. 完了シグネチャによって示されたvoid (asio::error_code,std::size_t)の関数オブジェクトを渡す
  2. 以下のいずれかのようなオブジェクトを渡し、様々な方法で結果を受けとるように設定する
    1. asio::use_future std::futureで結果を受け取る
    2. asio::use_awaitable C++20のコルーチンを構成できるようになる。co_awaitで結果を待つなどできる。 (詳細)
    3. asio::yield_context Boost.Coroutineライブラリに基づいたスタックフルコルーチンを利用して実行する。C++20のコルーチンとは異なるので注意(詳細)
    4. asio::deffered 非同期操作を遅延実行するための関数オブジェクトを返す。この関数オブジェクトに完了トークンを渡すと対象の操作を実行できる
    5. asio::detached 非同期操作はデタッチされる。結果はユーザー側に返されない
    6. その他、完了トークンとして利用可能なオブジェクトを渡す
  3. 非同期エージェントに関連特性(エグゼキューター、アロケーター、キャンセルスロットなど)を適用するために、完了トークンに関連特性をバインドして渡す

上記はすべて開始関数の単一のオーバーロードによって提供されるため、ユーザーは完了トークンとして渡すものを変えるだけで色々な動作のカスタマイズが可能です。

これだけ沢山の動作が可能なのは素晴らしいですが、どうやって戻り値の形式を決めるのでしょうか? ここで図で中央にある矢印が指す、async_result traitが登場します。 async_read_someを例とすると、以下のように3つの要素をasync_result traitに渡す事で具体的な戻り値の型が決定されます。

  1. async_read_someの宣言に書かれた完了シグネチャ void(asio::error_code,std::size_t)  -
  2. async_read_someがもつ内部実装                                                   -
  3. ユーザーが実際に渡した具体的な完了トークン                                           -
                |
                | これらを渡す
                ↓
        async_result trait(実際にはヘルパ関数 async_initiate に渡す)
                |
                ↓
        戻り値の型が決定する

上記の例において3つの要素は、以下のようにasync_result traitに渡されます。 (※ 以下は説明用の実装です。実際の実装はこちら。)

template <
    completion_token_for<void(error_code, size_t)> 
      CompletionToken>
auto async_read_some(
    const mutable_buffer& b,CompletionToken&& token)
{
  auto init = // ~~ 非同期操作の実装 ~~ 
  return async_result< // ← これが async_result trait
      decay_t<CompletionToken>,
      void(error_code, size_t)
    >::initiate(
        init,   // 非同期操作を実際に行う関数オブジェクト(実装は省略)
        std::forward<CompletionToken>(token), //ユーザーから渡された完了トークン
        b       // 関数オブジェクト(init)に渡す引数
      );
}

ここでは、1.の完了シグネチャはasync_resultのテンプレート引数として、2.の内部実装はinitとして、3.のユーザーが渡した完了トークンはtokenとして渡されます。そして、async_resultinitiate関数によって戻されるオブジェクトによって、実際の戻り値の型が推論されます。ちなみに、ユーザーが完了トークンとして単にコールバックを渡した場合は、戻り値の型は常にvoidになります。

なお、実際の実装をご覧になった方は気付いたかもしれませんが、そちらでは直接async_resultに渡すのではなくヘルパ関数であるasync_initiate越しに引数を渡しています。これは実装上の都合です。

最後に完了トークンを使って非同期操作をカスタマイズする例をいくつか記載してこの章を終わります。

asio::use_future を渡す例

try
{
  std::future<size_t> f = socket.async_read_some(buffer, asio::use_future);
  size_t read_size = f.get();
}
catch(const std::exception& e)
{
  std::cerr << e.what();
}

asio::deferred を渡す例

auto ope = socket.async_read_some(buffer, asio::asio::deferred);
ope([](std::error_code ec, std::size_t size){ // (あまり意味は無いが)すぐに実行する
  if(ec){
    std::cerr << "Error: " << ec.message() << std::endl;
  }else{
    std::cout << "Read size: " << size << std::endl;
  }
});

asio::append を使って完了ハンドラに追加の引数を渡す例

socket.async_read_some(buffer, asio::append(
   [](std::error_code ec, std::size_t size,const int64_t tekitou_num){ // int型の引数を追加
     if(ec){
       std::cerr << "Error: " << ec.message() << std::endl;
     }else{
       std::cout << "Read size: " << size << std::endl;
       std::cout << "Tekitou Number : " << tekitou_num << std::endl; 
     }
   } 
  ,765'346'283'315));

async_compose

ここまで色々な要素を紹介してきましたが、前記事までの知識と前章で紹介した完了トークンを合わせればAsioの非同期操作はひとまず利用できるようになっているかと思います。 次に本章では、Asioに実装されているasync_read_someのような非同期操作を、自分で実装する際に便利なasync_compose(リファレンス)という関数について紹介していきます。

async_composeは、前述したように非同期操作をユーザーが実装する際のボイラープレートコードを減らすために提供されています。非同期操作は完了トークンの章で登場したasync_resultasync_initiateを直接利用しても実装できますが、ユーザーは基本的にこちらを利用する方が簡単です。

シグネチャは以下のようになっています。(※これは説明用に少々簡略化されたものですが、理解の上では問題ありません。実際のコードはこちら)

template<
    typename CompletionToken,    // 完了トークン
    typename Signature,          // 完了シグネチャ
    typename Implementation,     // 実装である関数オブジェクト
    typename... IoObjectsOrExecutors> // エグゼキューターやI/Oオブジェクト(ソケットなど)
DEDUCED async_compose(
    Implementation && implementation,
    CompletionToken & token,
    IoObjectsOrExecutors &&... io_objects_or_executors);

ユーザーが指定する必要があるテンプレートパラメータは、基本的に完了シグネチャを表す第二引数のSignatureだけです。関数の引数には、implementationに非同期操作の実装部分を関数オブジェクトとして渡します。tokenには完了トークンを渡しますが、これは大体の場合開始関数に渡された完了トークンをただそのまま渡すだけです。 最後に、io_objects_or_executorsにはエグゼキューターやソケット等のエグゼキューターを持つI/Oオブジェクトを渡します。 ちなみにここで戻り値の型に書かれているDEDUCEDは、Asioで説明用に使われているもので、文字通り戻り値が推論されるという意味です。実際のコードでは大体autoなどになっています。

以下に、async_composeを使って非同期操作を実装する例を示します。

#ifdef USE_BOOST_ASIO_LIB
#include "boost/asio.hpp"
using namespace boost;
using error_code_t = boost::system::error_code;
using except_t = boost::system::system_error;
#else
#include "asio.hpp"
using error_code_t = asio::error_code;
using except_t = std::system_error;
#endif // USE_BOOST_ASIO_LIB

#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
#include <future>
#include <cxxabi.h>


// 1. 非同期操作の実装部分、当然lambda等で書くことも可能
struct async_heavy_output_impl
{
    using data_type = std::vector<uint8_t>;
    std::shared_ptr<data_type> data_ptr_;
    async_heavy_output_impl(data_type data)
        : data_ptr_(std::make_shared<data_type>(data)) {}

    template <typename Self>      // 2. ここのSelfは、asio::detail::composed_op<>が入る
    void operator()(Self& self)
    {
        auto op = [data_ptr = this->data_ptr_,slf = std::move(self)]() mutable
        {
            std::this_thread::sleep_for(std::chrono::seconds(2));
            // 3. ~~ ↑ ここで何らかの時間の掛かる送信処理をしているものとする ~~
            std::size_t n = data_ptr->size();
            error_code_t ec = {};
            slf.complete(ec, n); // 4. 結果と共にユーザーから渡されたハンドラを呼び出す。
        };
        std::cout << "Start operation" << std::endl;
        std::thread(std::move(op)).detach();
    }
};

// 5. async_composeを活用した非同期操作の開始関数
template <typename CompletionToken>
auto async_heavy_output(std::vector<uint8_t> data,
    CompletionToken&& token)
{
    return asio::async_compose<CompletionToken,void(error_code_t,std::size_t)>(
        async_heavy_output_impl {std::move(data)},
        token,
        asio::get_associated_executor(token));// 6. 完了トークンに関連付けられたエグゼキューターを取得
}

int main(int argc, char const *argv[])
{
    asio::io_context ioc;
    std::vector<uint8_t> data(1024,'a');
    auto guard = asio::make_work_guard(ioc);  // io_contextが自動終了しないための仕組み
    std::thread th([&ioc](){ioc.run();});

    std::cout << "(Normal) Start async_heavy_output" << std::endl;
    // 7. ユーザー定義のハンドラを完了トークンとして渡す
    async_heavy_output(data, asio::bind_executor(guard.get_executor(),// 8. 完了トークンに
        [](error_code_t e,size_t bytes_transferred){                  //    エグゼキューターを
            if(!e)                                                    //    バインド
            {
                std::cout << "(Normal) Transfer succeeded !!!\n"; 
                std::cout << "(Normal) Transferred bytes -> " 
                            << bytes_transferred << std::endl;
            }
            else
            {
                std::cout << "(Normal) Transfer failed..." << std::endl;
            }
        }));
    std::cout << "(use_future version) Start async_heavy_output " << std::endl;
    // 9. 完了トークンとしてuse_futureを渡すこともできる
    auto fut = async_heavy_output(data,  
        asio::bind_executor(
            guard.get_executor(),
            asio::use_future));  // 10. use_futureにもエグゼキューターをバインド
    auto n = fut.get();
    guard.reset();
    th.join();
    std::cout << "(use_future version) Transferred bytes -> " << n << std::endl;
    return 0;
}

上記は実行すると、以下のような出力が得られます。

(Normal) Start async_heavy_output
Start operation
(use_future version) Start async_heavy_output 
Start operation                                <---- (注釈)この下で約2秒待たされます
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(use_future version) Transferred bytes -> 1024

1.のコメントの位置にasync_composeimplementationに渡すための実装を関数オブジェクトとして書いています。コメントにもある通り、これは当然lambdaなどでも代用する事が可能です。引数として渡すオブジェクトの生存期間には気を付ける必要があります。

2.にあるSelfというテンプレートパラメータですが、ここにはcompose.hppに定義されているcomposed_opというテンプレートクラスが入ります。このクラスが持っているメソッドで関係してくるのは、おそらくvoid complete(Args... args)void operator()(T&&... t)executor_type get_executor()の3つです。void complete(Args... args)は完了シグネチャと同じシグネチャで呼び出します。これは非同期操作が完了した時に利用するもので、呼び出すとその引数が直接完了ハンドラに渡されます。今回はスレッドの中(4.の位置)で呼び出して完了を通知しています。 void operator()(T&&... t)async_composeに渡したimplementationを呼び出します。今回の場合は、async_heavy_output_impl::operator ()を呼び出すのに利用され、シグネチャから分かるようにこちらも引数は完全転送されます。このoperatorは今回の実装では利用していませんが、再帰的にimplementationを呼び出すのに利用できます。なので、関数オブジェクトに状態を持たせてcomposed_opを新たな非同期関数に完了トークンとして渡す連鎖をしたりすると、状態遷移を伴う非同期操作が可能になります。このような例は、公式のサンプルにあります。最後に、executor_type get_executor()ですが、これは単にcomposed_opに結びつけられたエグゼキューターを取得するものです。このコードでは利用されていませんが、今回の3.の位置などで完了ハンドラを他のエグゼキューターで実行するように設定したい場合などに利用する事ができます。この後示すコードにそのような利用例があります。

5.は実際の非同期関数の実装です。ここでは単にasync_composeをreturnしているだけとなります。ここで気を付ける必要がある点は、6.です。利用するエグゼキューターを知るために、ユーザーに完了トークンにバインドしてもらったエグゼキューターを取得してasync_composeに渡す必要があります。(async_composeio_objects_or_executorsにちゃんと渡す必要があるということです)完了トークンは単にasync_composeに完全転送すればよいだけです。

作成した非同期操作の開始関数の利用方法はAsioに実装されている開始関数群とほぼ同じです。唯一違うのは、8.のように忘れずに完了トークンにエグゼキューターをバインドすることです。完了トークンの章で紹介したカスタマイズの方法は全て利用できるはずです。例として、9.ではasio::use_futureを使ってstd::futureで結果を受け取っています。

上記では簡単に実装するために、threadを使って非同期操作を実行していましたが、毎回スレッドを立ち上げていては効率が良くありません。そこで、次にスレッドプールを使って、非同期操作をワーカースレッドで動かすように実装してみます。

// ~~~~~ 前回と差分が無いので省略 ~~~~~

// 1. 非同期操作をエグゼキューターにpostするようにしたバージョン。
// これにエグゼキューターとしてthread_poolを渡せば、ワーカースレッドで処理される。
struct async_heavy_output_thread_pool_impl
{
   // ~~~~~ 前回と差分が無いので省略 ~~~~~
    template <typename Self>
    void operator()(Self& self)
    {
        auto op = [data_ptr = this->data_ptr_,slf = std::move(self)]() mutable
        {
            // ~~~~~ 前回と差分が無いので省略 ~~~~~
        };
        std::cout << "Start operation" << std::endl;
        asio::post(self.get_executor(),std::move(op)); // 2. エグゼキューターにpostする
    }
};

// async_composeを活用した非同期操作の開始関数
template <typename CompletionToken>
auto async_heavy_output_thread_pool(std::vector<uint8_t> data,
    CompletionToken&& token)
{ /* ~~~~~ 前回と差分が無いので省略 ~~~~~ */}

int main(int argc, char const *argv[])
{
    asio::thread_pool pool(2);  // 3. 2つのワーカースレッドを持つスレッドプール
    std::vector<uint8_t> data(1024,'a');

    std::cout << "(Normal) Start async_heavy_output_thread_pool" << std::endl;
    // 4. 10回呼び出す
    for (int i = 0; i < 10; i++)
    {
        async_heavy_output_thread_pool(data, asio::bind_executor(guard.get_executor(),
        [](error_code_t e,size_t bytes_transferred){
            if(!e)
            {
                std::cout << "(Normal) Transfer succeeded !!!\n"; 
                std::cout << "(Normal) Transferred bytes -> " 
                            << bytes_transferred << std::endl;
            }
            else
            {
                std::cout << "(Normal) Transfer failed..." << std::endl;
            }
        }));
        std::cout << "(use_future version) Start async_heavy_output_thread_pool " << std::endl;
    }
    pool.join(); // thread_pool::joinでタスクの終了を待つ
    std::cout << "(use_future version) Transferred bytes -> " << n << std::endl;
    return 0;
}

上記のコードを実行すると、以下のような出力が得られます。

(Normal) Start async_heavy_output_thread_pool
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation
(use_future version) Start async_heavy_output_thread_pool 
Start operation                                <----- (注釈)ここまで一気に出てくる
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024             <----- (注釈)2つ毎に2秒待つ
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(Normal) Transfer succeeded !!!
(Normal) Transferred bytes -> 1024
(use_future version) Transferred bytes -> 1024

出力中の注釈にもある通り、最初に10回開始した非同期操作が、2秒毎に2つずつ終了しています。これは、2つのワーカースレッドで非同期操作を処理している例です。

コードの差分は多くありません。1.の関数内の、2.にて前回はスレッドを起動していた所を、composed_op<>::get_executorを使って取り出したエグゼキューターに対してasio::postを使って非同期操作を実行するように要請するように変更しています。このasio::postは、エグゼキューターに対して関数オブジェクトを渡して、それを実行してもらうための関数です。今回は、エグゼキューターがスレッドプールであるため、スレッドプールのワーカースレッドで非同期操作が実行されます。

スレッドプールは、3.で宣言しています。asio::thread_poolはコンストラクタに数字を渡す事でワーカースレッドの数を指定できます。そのあと4.にて開始関数を10回呼んでいますが、2つのワーカースレッドで処理できている事が出力をみると分かります。

(余談) async_initiateとの比較

(※本稿は余談的な内容です。読まなくても問題ありません。)

async_composeを利用すると実装が楽だと前述しましたが、実際の所はasync_initiateを利用するパターンでも劇的に面倒という訳ではありません。公式のサンプルにはasync_initiateを使ったものもありますが、面倒な点はせいぜい型の宣言といった所でしょうか。これはC++20のautoのお陰で、非常に複雑な戻り値の型を単にautoで推論させられるようになったことがあるようです。なので、状況によってはasync_initiateを利用することも検討してみても良いかもしれません。

公式のサンプルの少々宣言が面倒な例

template <
  asio::completion_token_for<void(std::error_code)> CompletionToken>
auto async_write_message(tcp::socket& socket,
    const char* message, CompletionToken&& token)

ip::tcp::socket::async_read_someの宣言を見ると、後方互換性のためか煩雑な宣言になっています。C++20を使えない環境であればasync_composeの方が楽なのかもしれません。

  template <typename MutableBufferSequence,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
        std::size_t)) ReadToken = default_completion_token_t<executor_type>>
  auto async_read_some(const MutableBufferSequence& buffers,
      ReadToken&& token = default_completion_token_t<executor_type>())
    -> decltype(
      async_initiate<ReadToken,
        void (asio::error_code, std::size_t)>(
          declval<initiate_async_receive>(), token,
          buffers, socket_base::message_flags(0)))      // <-- C++20 autoが使えない状況を加味?
  {
    return async_initiate<ReadToken,
      void (asio::error_code, std::size_t)>(
        initiate_async_receive(this), token,
        buffers, socket_base::message_flags(0));
  }

おわりに

今回は、完了トークンとasync_composeについて紹介しました。async_composeを使えば、Asio以外の通信ライブラリなどをAsioと組み合わせて簡単に非同期操作にすることができます。MQTTのライブラリである、async_mqttはまさにそのような事をやっており、MQTTの通信機能を非同期操作として提供しています。Asioの利用方法としても参考になるのでご覧になってはいかがでしょうか。また本記事までの内容で、恐らくAsioの基本機能はほ多くが利用可能になったのではないかと思います。次回は最終回で、知識の紹介を中心としていたこれまでとは異なり、Asioの便利な関数等の紹介や、サンプルコード掲載など、実践的な内容とする予定です。

参考文献

*1:boost版の場合は、boost::system::system_errorになります。