Asio (Boost.Asio) C++ライブラリ入門 3 -コアとなる概念-

執筆者:井上 叡
監修者:稲葉 貴昭・高橋 浩和

※ 「Asio (Boost.Asio) C++ライブラリ入門」連載記事一覧はこちら


はじめに

本記事では、非同期I/Oを扱うC++ライブラリであるAsio(またはBoost.Asio)ライブラリを紹介・解説します。Asio(またはBoost.Asio)ライブラリは、非同期I/Oを行う際に利用候補に挙がることがありますが、初めて使うと意外と仕組みが理解しづらく少し苦労したりします。そこで、本連載では、Asio(またはBoost.Asio)ライブラリが採用する非同期モデルやコア機能など、ライブラリの基礎となる部分を中心に解説していきます。解説の際にはなるべくサンプルコードを掲載し、具体例と共に理解していけたらと思います。

本記事で扱うライブラリのバージョンは、Boost 1.85とAsio1.30.2です。
本章以降で「Asio」と記載した際、その記述は「Boost.Asio」と「Asio」の両方について述べているものとします。

3記事目となる本記事では、Asioライブラリのコアとなる概念やいくつかの用語について紹介していきます。

Asio(またはBoost.Asio)における非同期モデルの設計

ここからは、Asioにおける非同期処理を実現するためのライブラリの設計について、より詳しく紹介していきます。前回までは、Asioの設計の大枠としてのProactor・Reactorの話でしたが、今回はどちらかというと、具体的な内部設計としての細かい話について紹介します。あくまで、Proactorモデルが設計の中心にあるという前提で、より詳しい内部設計の話として認識してください。

Asioにおける非同期操作(Asynchronous operation)

Asioでは「非同期操作(Asynchronous operation)」は、バックグラウンドで起動されて実行される作業を表します。そして、その作業を開始したユーザーは他の作業を続ける事ができます。また、非同期操作は「開始関数(Initiating function)」と「完了ハンドラ(Completion handler)」の2つからなるものともしています。

開始関数と完了ハンドラについては以下の通りです。

  • 開始関数 (Initiating function)
    • ユーザーによって呼ばれる、非同期操作を開始するための関数
  • 完了ハンドラ (Completion handler)
    • ユーザー定義のムーブオンリーオブジェクトで、非同期操作の結果と共に最大1回呼び出される。完了ハンドラの呼び出しは、非同期で行われる何らかの操作が完了してその操作の副作用(accept()やバッファへの書き込み等)が確立された事をユーザーに伝える。

非同期操作と開始関数・完了ハンドラの関係を図示すると以下のようになります。

非同期操作

図からも分かるように、非同期操作は開始関数によって始められ、操作が終わると完了ハンドラを呼び出すものとされています。開始関数はasync_accept等の関数、完了ハンドラはそれらの関数に設定されるlambda等が具体例に該当します。(※上記はProactorに出てくる各操作を細かく名付けしたイメージです)

また、非同期操作のライフサイクルを図示すると、以下のようになります。

非同期操作のライフサイクル

非同期操作が対象とする操作、例えば開始関数がasync_accept()の場合は、イベント1と2の間にaccept()が実行されます。。その後完了ハンドラはイベント3で実行されます。また、「非同期操作」はイベント1から3全てを含んだものを指します。

Asioにおける非同期操作の特徴

Asioの非同期操作の重要な特徴について2つ紹介します。

  1. 非同期操作が一時的なリソース(メモリ、ファイルディスクリプタ等)を必要とする場合、それらのリソースは完了ハンドラが呼ばれる前に解放される。
  2. 非同期操作がジェネリックな場合、完了ハンドラの引数の型と順序は、開始関数とその引数によって決定論的に導かれる。

1つめは重要で、完了ハンドラが内部で開始関数を呼ぶことで連鎖的に非同期操作を行うような場合に、リソースの重複等を気にする必要が無くなります。これによってユーザーは連鎖的に受信を続ける処理などを簡単に書く事が出来ます。

非同期エージェント(Asynchronous agent)

次は非同期エージェント(Asynchronous agent)について紹介します。この言葉は、Asioが扱う一連の「非同期操作」とそれが持つ特性などを総合して表すための言葉です。 先ほど紹介した「非同期操作」とは違い、ライブラリが持つ具体的な一連の操作などを指すわけではありません。(※ 例えば、ライブラリが「非同期エージェント」という操作や機能を持っているのではありません)

非同期エージェント

中層の、「非同期操作」と「関連特性(次章で解説)」はAsioが持つ概念的な機能です。
下層の、「開始関数」、「完了ハンドラ」、「エグゼキューター」、「アロケーター」、「キャンセルスロット」は、実際にAsioにその名前のクラスや関数等が存在する機能です。

この概念においては、いくつかの連鎖した非同期操作をまとめて1つの非同期エージェントとして扱われる事があります。例えば、TCP通信においてasync_accept()の完了ハンドラから、別の開始関数であるasync_read()が呼ばれるような連鎖的な処理の場合、それらをまとめて1つの非同期エージェントとして扱ったりします。ここでは、2つの非同期操作を含む非同期エージェントとなります。

非同期操作が連鎖する様子

また、一連の連鎖する操作の内容によっては、いくつかのエージェントが親エージェント子エージェントという関係になることもあります。

例えば、親エージェントが一連のTCPのデータ受信処理、子エージェントが受け取ったデータの非同期の処理だとします。以下の図のように、まず親エージェントがaccept->readを連鎖的に行います。そして、親エージェントのreadの完了ハンドラから、受け取ったデータの非同期処理を開始する一連の子エージェントの開始関数を呼ぶ、と言ったイメージです。また、子エージェントの連鎖の最後となる完了ハンドラから親エージェントのacceptを呼ぶようにする事で、データを受信して処理するという流れを繰り返す事ができます。

親エージェントと子エージェントの例

非同期操作の連鎖のサンプルコード

非同期操作を連鎖させるには、1つめの非同期操作の完了ハンドラ内で次の非同期操作の開始関数を呼べばよいです。 以下は、非同期操作を連鎖させるコード断片です。

void start_async_task()
{
    // ~~~ 何らかの非同期処理をする ~~~
    asio::post(io_cotext,start_async_task_handler); //ハンドラを投稿
}

void start_async_task_handler(asio::error_code err)
{
    if(!err)
    {
        acceptor.start_accept(socket,async_accept_handler);    //acceptorでaccept開始。次の非同期操作を始める。
    }
} 

void async_accept_handler(asio::error_code err)
{
    if(!err)
    {
        socket.async_receive(     //acceptが成功したので非同期receiveの開始関数を呼ぶ。
            asio::buffer(asio::buffer(buff)),
            [this](const error_code_t &error, const size_t bytes_transferred)
            {
                async_receive_handler(error, bytes_transferred);
            });
    }
}

void async_receive_handler(asio::error_code err,size_t bytes_transferred)
{
    if(!err)
    {
        start_async_task();     //一番最初の開始関数を呼び出し、最初に戻る。
    }
}

かなり省かれていますが、重要なのはハンドラの中での動きです。一番最初にstart_async_task()という開始関数を呼び、その完了ハンドラでstart_accept()を呼びます。 その後も完了ハンドラの中で次の開始関数を呼び、最後のasync_receive_handler()では、一番最初の開始関数start_async_task()を呼んでいます。 これによって、この非同期エージェントは連鎖的に一連の処理をし続けます。このように非同期エージェントの中で非同期操作の連鎖を行うのは、一般的な実装方法です。この方法だと、receiveを行う前にコネクションが確立されているかを確認をする必要がないなどの利点があります。

※ この書き方の場合はどこかで不具合があると処理が終了するので、実際にはif(!err)に該当しない場合の処理も書く必要があります。

関連特性(Associated Characteristics)

非同期エージェントは、それに含まれる非同期操作がどのようにふるまうべきかを指定する、関連特性(Associated Characteristics)を持っています。関連特性としては具体的に以下の3つがあります。

  • アロケーター(Allocator)
    • エージェントが持つ非同期操作がどうやってメモリリソースを獲得するか決める
  • キャンセルスロット(Cancellation slot)
    • エージェントが持つ非同期操作がどうやって操作のキャンセルをサポートするか決める
  • エグゼキューター(Executor)
    • エージェントが持つ完了ハンドラがどうやってキューイングされ、実行されるか決める

非同期エージェントが持つ非同期操作は、実行の際にこれらの関連特性に従うようになっています。

エグゼキューター(Executor)

全ての非同期エージェントは、関連付けられたエグゼキューター(Executor)を持ちます。エグゼキューターは、エージェントが持つ完了ハンドラがどうやってキューイングされて、どうやって実行されるかを決定します。これは、スケジューラーに近いような役割を果たし、エージェントの実行方法、場所、タイミングに関するポリシーを表します。例えば、前回までのサンプルコードに出てきたasio::io_context(リファレンス)というクラスはエグゼキューターの一例です。

非同期操作は、エージェントが持つエグゼキューターを利用して以下のことを行います。

  • 非同期操作が行う何らかの作業(※acceptなど)が未完了の間、その作業がまだ存在しているか追跡し続ける
  • 操作完了時に呼び出すために、完了ハンドラをキューに登録する
  • 完了ハンドラがリエントラントに実行されると意図しない再帰等の問題が発生する場合、完了ハンドラがリエントラントに実行されないように保証する

エグゼキューターは用途によって様々な動作をするものが考えられますが、一例としては、以下のような役割をするものが考えられます。

  • 何らかの共有資源の保護のため、完了ハンドラが同時に2つ以上実行されないようにする*1
  • 全ての完了ハンドラを必ずGUIアプリケーションスレッドで実行されるようにキューイングする
  • エージェントがNICなどに近い指定された実行リソース(CPUコアなど)上で実行されるようにする
  • 非同期エージェントと完了ハンドラの実行優先度を指定する
  • 完了ハンドラの実行前後などに、ロギングなどの処理を実行するようにする

また、asio::io_context(リファレンス)は以下のような特性を持っています。

  • 完了ハンドラはio_context::run()に相当する*2メソッド内からのみ呼び出される。つまり、run()を1つのスレッドからのみ呼び出せば、ハンドラは1つずつ順番に実行される。逆に複数スレッドから呼び出せば複数のスレッドでいくつかのハンドラが同時に実行される可能性がある。
  • ハンドラから投げられた例外は、io_context::run()に相当するメソッドを実行しているスレッドに伝播される。

asio::io_contextとは違うエグゼキューターの例としてはasio::thread_poolがあり、こちらはエージェントをスレッドプールの中で実行することができます。

エグゼキューターのサンプルコード

以下はasio::io_contextの動作サンプルです。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
#include "boost/asio.hpp"
using namespace boost;
using error_code_t = boost::system::error_code;
#else
#include "asio.hpp"
using error_code_t = asio::error_code;
#endif // USE_BOOST_ASIO_LIB

#include <iostream>
#include <future>

int main(int argc, const char **argv)
{
    asio::io_context ctx;

    auto first_handler = []()
    {
        auto th_id = std::this_thread::get_id();
        std::cout << "Call first handler from => " << th_id << std::endl; 
    };
    asio::post(ctx,first_handler);              // first_handlerをio_contextの実行キューに追加
    ctx.run();

    auto second_handler = []()
    {
        auto th_id = std::this_thread::get_id();
        std::cout << "Call second handler from => " << th_id << std::endl;
    };
    asio::post(ctx,second_handler);             // second_handlerをio_contextの実行キューに追加
    auto fut = std::async(std::launch::async,[&ctx](){
        ctx.restart();
        ctx.run();
    });
    fut.get();
    return 0;
}

asio::postは、対象のハンドラをio_contextの実行キューに登録する関数です。 先ほど紹介したように、キューに登録されたハンドラを実行するのはio_context::run()です。 上記を実行すると、例として以下のような出力が出てきます。

Call first handler from => 29156
Call second handler from => 29392

2つのスレッドIDが違うことが分かります。これは、io_contextのrun()を呼んだスレッドでハンドラを実行するという特性を示しています。

キャンセルスロット(Cancellation slot)

Asioでは、ソケットやタイマーなどの多くのオブジェクトが、デフォルトでclose()またはcancel()メンバー関数を介して未処理の非同期操作全体のキャンセルができます。例えば、accpetとそれに続くreadなどの一連の操作を行うエージェントにおいて、相手からのconnect要求を待っている間にキャンセルすると、その"待ち"と続きのaccept()やread()などの全てをキャンセルできます。ただし、特定の非同期操作では、個別の対象を絞ったキャンセルも可能です。この個別の操作ごとのキャンセルは、非同期エージェントのキャンセルスロット(Cancellation slot)を使うことで実現できます。

キャンセルスロットは、キャンセルシグナルに接続され、シグナルを受け取った際には関数オブジェクトであるキャンセルハンドラを呼び出します。このハンドラは事前にエージェントのスロットに設定しておきます。キャンセルスロットは1つのエージェントが1つ持ちます。また、スロットは最大1つのハンドラしか同時に保持できません。新しいハンドラを設定すると以前のものは上書きされます。つまり、1つのエージェントが複数の非同期操作を有している時は、同じスロットが後続の非同期操作でも再利用されます。

キャンセルは、エージェントが複数の子エージェントを持っている場合に便利です。たとえば、1つの子エージェントが完了して、他方の副作用が不要になった場合にキャンセルするなどです。

キャンセルのサンプルコード

以下は、全体のキャンセルの例です。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
#include "boost/asio.hpp"
using namespace boost;
using error_code_t = boost::system::error_code;
#else
#include "asio.hpp"
using error_code_t = asio::error_code;
#endif // USE_BOOST_ASIO_LIB

#include <iostream>
#include <future>

using asio::ip::tcp;

// acceptが完了したら行う別の非同期操作の開始関数(という設定)
void async_another_task()
{
    std::cout << "Do another asynchronous task..." << std::endl;
    // 何らかの非同期操作を開始...。
}

// accept完了ハンドラ
void accept_handler(const error_code_t &error)
{
    if (!error)
    {
        std::cout << "Accept and connection established !!!" << std::endl;
        async_another_task(); // 他の非同期操作を連鎖的に開始する。
    }
    else
    {
        std::cout << "Failed to establish connection..." << std::endl;
    }
}

int main(int argc, const char **argv)
{
    const int port = 7650;
    asio::io_context io_context;
    asio::steady_timer steady_timer(io_context);
    steady_timer.expires_after(std::chrono::seconds(3));  // 3秒後に切れるタイマーを設定
    tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), port));
    tcp::socket socket(io_context);
    std::cout << "Start async_accept" << std::endl;
    acceptor.async_accept(                      // 7650番で待機するが、相手はいない。
        socket,                                 // このままだと無限に待つ。
        [](const error_code_t &error)
        {
            accept_handler(error);
        });

    auto guard = asio::make_work_guard(io_context);
    steady_timer.async_wait([&guard,&acceptor](const auto& err){ 
                    acceptor.cancel();          // タイマーが切れたらasync_acceptをキャンセルする
                    guard.reset();});
    auto th = std::async(std::launch::async, [&io_context](){ io_context.run(); });
    th.get();
    return 0;
}

ここでは、正常に進めばasync_acceptがacceptを開始し、それが成功したら連鎖的にasync_another_taskという別の非同期操作の開始関数を呼びます。 しかし、この状況では相手が存在しないため、async_acceptで停止します。そのため、3秒後に発動するタイマーを設定し、そのタイマーのハンドラでacceptorcancel()を呼んでいます。 実行すると、以下のように出力があります。

Start async_accept
Failed to establish connection...

1行目と2行目の間には3秒の待ち時間があります。ここでは、async_acceptをキャンセルする事で、続く開始関数であるasync_another_taskは実行されなくなります。 そのため、このタイミングでのacceptorのキャンセルは非同期エージェントの一連の非同期操作を全てキャンセルすることになります。

アロケーター(Allocator)

全ての非同期エージェントは、関連付けられたアロケータ(Allocator)を持ちます。このアロケータは、エージェントが持つ各非同期操作が操作毎にメモリリソースを取得するために利用されます。ここで取得したリソースは、各操作の存続期間中のみ保持され、操作が終了する時(= 完了ハンドラが呼び出される直前)には解放されるため、操作中はその場所でメモリが確実に使用可能です。

このアロケータはC++標準ライブラリのコンテナ類と同じように、特に指定しなければデフォルトのものが設定されます。当然、ユーザーがカスタマイズしたい場合にはユーザー定義のものを設定する事も可能です。

アロケーターのサンプルコード

例として、asio::ip::tcp::socket::async_read_some()に対してアロケーターは以下のように渡すことができます。

socket_.async_read_some(asio::buffer(data_),
                        asio::bind_allocator(
                            custom_allocator<int>(handler_memory_),
                            [this, self](std::error_code ec, std::size_t length)
                            {
                                if (!ec)
                                {
                                    do_write(length);
                                }
                            }));

ここで、2引数目の今まで完了ハンドラとしてlambda等を渡していた場所で、asio::bind_allocatorにユーザーのカスタムアロケーターと完了ハンドラであるlambdaを渡しています。

asio::ip::tcp::socket::async_read_some()のシグネチャを見てみると、以下のようになっています。 (※戻り値の型は省略)

  template <typename MutableBufferSequence,
      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
        std::size_t)) ReadToken = default_completion_token_t<executor_type>>
  auto async_read_some(const MutableBufferSequence& buffers,
      ReadToken&& token = default_completion_token_t<executor_type>())

2引数目がRreadToken = default_completion_token_t<executor_type>という型のユニバーサル参照になっています。これは、AsioではCompletionTokenと呼ばれる型のことです。詳細は次記事以降で紹介しますが、今までlambdaや関数の形で渡してきた完了ハンドラは、実はCompletionTokenと呼ばれる型として扱われており、かなり自由な形式で受け入れられます。例えば、asio::use_futureというオブジェクトをCompletionTokenとして渡すと、完了ハンドラの呼び出しの代わりに結果をstd::futureでセットしてくれます。このasio::bind_allocatorという型は、CompletionTokenにアロケーターを紐づける事ができます。 また、似たような役割を果たす型としてasio::bind_cancellation_slotasio::bind_executorという型もあります。

ハンドラのトラッキング

本記事ではここまで様々なAsioの用語や概念について紹介してきましたが、ここでデバッグやプログラムの動作の理解に役立つ機能を紹介します。それは、ハンドラのトラッキング機能です。この機能は、コンパイル時にASIO_ENABLE_HANDLER_TRACKINGというマクロを設定する事で有効になります。このトラッキング機能は、デバッグに役立つのは勿論、理解しにくい非同期操作の流れを把握するのにも役立ちます。ぜひ利用してみてください。

例として、前回の記事に出てきたTCPのサンプルコードをこのマクロを設定して実行してみようと思います。

以下が実行時の出力です。

@asio|1741860086.877100|0*1|socket@0x7ffc5f834c50.async_accept
@asio|1741860086.877149|.1|non_blocking_accept,ec=asio.system:11
@asio|1741860086.877168|0*2|socket@0x7ffc5f834bd0.async_connect
@asio|1741860086.877252|0*3|signal_set@0x7ffc5f834b68.async_wait
@asio|1741860086.877340|.1|non_blocking_accept,ec=asio.system:0
@asio|1741860086.877372|>1|ec=system:0
Accept and connection established !!!
@asio|1741860086.877415|1*4|socket@0x7ffc5f834ca8.async_receive
@asio|1741860086.877420|.4|non_blocking_recv,ec=asio.system:11,bytes_transferred=0
@asio|1741860086.877422|<1|
@asio|1741860086.877425|.2|non_blocking_connect,ec=system:0
@asio|1741860086.877427|>2|ec=system:0
Connect succeeded!!!
Sending data...
@asio|1741860086.877448|2*5|socket@0x7ffc5f834bd0.async_send
@asio|1741860086.877464|.5|non_blocking_send,ec=system:0,bytes_transferred=10
@asio|1741860086.877476|<2|
@asio|1741860086.877479|>5|ec=system:0,bytes_transferred=10
Send succeeded!!! Send 10bytes
@asio|1741860086.877516|<5|
@asio|1741860086.877520|.4|non_blocking_recv,ec=asio.system:0,bytes_transferred=10
@asio|1741860086.877522|>4|ec=asio.system:0,bytes_transferred=10
Receive succeeded!!! Read 10 bytes.
Recv data ===> Data dayo.
@asio|1741860086.877537|<4|
^C@asio|1741860110.665237|>3|ec=system:0,signal_number=2
@asio|1741860110.665259|<3|
@asio|1741860110.665321|0|socket@0x7ffc5f834bd0.close
@asio|1741860110.665368|0|socket@0x7ffc5f834ca8.close
@asio|1741860110.665418|0|socket@0x7ffc5f834c50.close
@asio|1741860110.665436|0|signal_set@0x7ffc5f834b68.cancel

リファレンスに詳細がありますが、以下で簡単に紹介します。

フォーマットは以下です。

<tag>|<timestamp>|<action>|<description>

<action>の所の数字は以下を意味します。

  • >n
    • プログラムがハンドラ番号nに入った。<description>はハンドラの引数を示す。
  • <n
    • プログラムはハンドラ番号nから出た。
  • !n
    • プログラムは例外によってハンドラ番号nから出た。
  • ~n
    • ハンドラ番号nが呼び出されずに破棄された。これは通常、io_contextが破棄された際に残っている未完了の非同期操作が該当する。
  • n^m
    • ハンドラ番号nは、完了ハンドラ番号mで新しい非同期操作を作成しようとしている。<description>には、プログラムのどこで非同期操作が開始されるかを特定するのに役立つソース位置情報が含まれる。
  • n*m
    • ハンドラ番号nは、完了ハンドラ番号mの新しい非同期オペレーションを作成した。<description>は、どのような非同期オペレーションが開始されたかを示している。
  • n
    • ハンドラ番号nは他の処理を実行した。<description>は、どの関数が呼ばれたかを示す。現状では、close() と cancel() の操作だけが該当する。理由はこの2つの関数は、未完了のハンドラを破棄したりする副作用を起こす可能性があるから。
  • .n
    • ライブラリが、ハンドラ番号nが完了ハンドラである非同期操作の一部として、システムコールを発行した。<description>は、呼び出された関数とその結果を示す。これらの追跡イベントは、リアクターベースの実装を使用している場合にのみ発行される。

また、トラッキングの出力はasio付属のhandlerviz.plというツールを使うとgraphvizの形式として出力できます。上記出力をgraphvizで可視化した結果は以下の通りです。

graphvizで可視化した出力

また、ハンドラトラッキングではデバッグのために設定できるマクロが他にも色々あります。リファレンスをご覧ください。

おわりに

今回紹介した概念は、ライブラリのリファレンスを読んでいて頻出する概念です。今回までの内容を知ってから公式のリファレンスを読むと、理解できる部分が増えているかと思います。また、今回紹介したハンドラトラッキングを有効にして、公式が提供する様々なサンプルコードを実行してみると理解に役立ちます。 次回は、アロケーターの章で出てきたCompletionTokenについてや、io_context()の実装、Asioを使って非同期操作を実装する際のヘルパ関数であるasync_compose()などを具体的な実装を見ながら紹介していく予定です。

参考文献


VA Linux は、千葉工業大学 未来ロボティクス学科のロボカップチーム「CIT Brains」をスポンサーとして応援しており、インターン生も数名受け入れています。
本記事は、その一環としてインターン生の井上さんが記事を執筆し、弊社が監修を行いました。

*1:これは、Asioではasio::strandという名前のエグゼキューターになっています

*2:run()に相当するメソッドとして、run_one()、run_for()、run_until()、poll()、poll_one()があります。