Asio (Boost.Asio) C++ライブラリ入門 2 -Proactor-

執筆者:井上 叡
監修者:稲葉 貴昭・高橋 浩和

※ 「Asio (Boost.Asio) C++ライブラリ入門」連載記事一覧はこちら


はじめに

本記事では、非同期I/Oを扱うC++ライブラリであるAsio(またはBoost.Asio)ライブラリを紹介・解説します。Asio(またはBoost.Asio)ライブラリは、非同期I/Oを行う際に利用候補に挙がることがありますが、初めて使うと意外と仕組みが理解しづらく少し苦労したりします。そこで、本連載では、Asio(またはBoost.Asio)ライブラリが採用する非同期モデルやコア機能など、ライブラリの基礎となる部分を中心に解説していきます。解説の際にはなるべくサンプルコードを掲載し、具体例と共に理解していけたらと思います。

2記事目となる本記事では、ライブラリで採用されている重要なデザインパターンであるProactorについてと、それに関連するいくつかの簡単なサンプルコードを紹介します。

非同期デザインパターン ProactorとReactor <2>

Asioでは、ProactorとReactorという2つのデザインパターンが重要な役割を果たしています。これらは、どちらもイベント処理を行うためのもので、I/Oに関連するイベントの取り扱いに役立ちます。前回記事ではReactorについて紹介しました。今回は、いよいよProactorについて紹介していきます。

Proactor

Proactorは、Reactorと同じく、非同期イベント処理のための設計パターンで、非同期I/Oの操作を効率良く管理するために利用されます。Proactorも非同期I/Oの操作をイベントとして取り扱いますが、Reactorとは扱う操作の種類が違います。 まずは、両者に何が共通していて何が違うのか、について紹介します。

まず、2つのパターンの最も大きな違いは、Proactorは非同期操作の完了通知をイベントとして扱い、それに基づいて処理を行うということです。つまり、Proactorを使うと、操作開始までの待機から待機完了後の実際の操作まで勝手にやってくれる事になります。
以下は、Reactorの項で示したロギングサーバーの例です。

サーバーの動作 (Reactorの項より)

  1. クライアントからのコネクション要求を待機する
  2. コネクション要求を受け取り、コネクションの確立を試みる
  3. クライアント側のログデータのsend実行を待つ
  4. sendされたデータを受信する

Reactorでは1.と3.を非同期に行っていました。しかし、Proactorの場合は、1.と2.の両方、または3.と4.の両方をまとめて、非同期で行うことができます。例えば、1.と2.をユーザーが行いたい場合は、ユーザーはコネクション要求の待機(= 1.)を開始する事を命じて、後は待っているだけでコネクションの確立(= 2.)までをProactorに行わせる事ができます。 これは、ユーザーはそれらの一連の操作(= 1.と2.の組や3と.4.の組)完了後に渡されるデータ等を扱うハンドラを書くだけでよいということです。これに対しReactorは、待機処理の完了を通知してくれる仕組みだったので、その後の実際の操作(= 2.と4.)を行うハンドラをユーザーが書く必要がありました。
また、Proactorは操作の部分まで非同期で実行してくれるため、シングルスレッドで操作も非同期に同時実行することができます。操作まで非同期で実行できるので、複数のクライアントからの要求を同時に処理しなくてはいけない場合などにも活用できます。逆にReactorは、シングルスレッドの場合は操作は同期的に、つまり1つずつ順番に実行することになります。

2つのモデルの違いについてまとめると、以下のようになります。

  • ProactorとReactorの違い
    • 取り扱うイベント
      • Reactor -> I/O等の操作準備完了をイベントとする。それによってハンドラが実行される。
      • Proactor -> 非同期操作の完了をイベントとする。それによってハンドラが実行される。
    • ハンドラの典型動作 *1
      • Reactor -> 準備が完了した何らかの操作を実行する
      • Proactor -> 実行した非同期操作の結果を扱う
    • 対象とする操作の同期性
      • Reactor -> 対象の準備完了した操作は同期的に実行される
      • Proactor -> 対象の操作は非同期に実行される。複数の操作が(可能なら)同時に実行される場合もある。
    • メモリ使用量
      • Reactor -> 操作を実行している間はその結果を保存するバッファが必要となるのでメモリが占有される可能性がある。
      • Proactor -> バッファ等は操作終了後すぐに破棄する事ができる。

Proactorの利用例

次はProactorを利用したサンプルコードを見ていきましょう。今回は、Asioライブラリを活用したコードです。 以下は、AsioライブラリのProactorの機能を利用したTCP通信のコードです。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
#include "boost/asio.hpp"
using namespace boost;
using error_code_t = boost::system::error_code;
#else
#include "asio.hpp"
using error_code_t = asio::error_code;
#endif // USE_BOOST_ASIO_LIB

#include <iostream>
#include <future>
#include <string>

using asio::ip::tcp;

class Client
{
    asio::io_context &io_context_;
    tcp::socket send_socket_;
    tcp::endpoint endpoint_;

public:
    Client(asio::io_context &ioc, const short port)
        : io_context_(ioc), endpoint_(asio::ip::make_address_v4("127.0.0.1"), port)
            , send_socket_(io_context_) {}

    // connect開始
    void start_connect()
    {
        send_socket_.async_connect(endpoint_, [this](const error_code_t &err)
                                   { connect_handler(err); });
    }

    // send開始
    void start_send()
    {
        std::cout << "Sending data..." << std::endl;
        const std::string send_data("Data dayo.");
        send_socket_.async_send(asio::buffer(send_data),
                                [this](const error_code_t &err, const size_t bytes_transferred)
                                {
                                    send_handler(err, bytes_transferred);
                                });
    }

    // connect完了ハンドラ
    void connect_handler(const error_code_t &err)
    {
        if (!err)
        {
            std::cout << "Connect succeeded!!!" << std::endl;
            start_send(); // 接続が完了したので、次は送信を開始する
        }
        else
        {
            std::cout << "Connect failed..." << std::endl;
        }
    }

    // send完了ハンドラ
    void send_handler(const error_code_t &err, const size_t bytes_transferred)
    {
        if (!err)
        {
            std::cout << "Send succeeded!!! Send " << bytes_transferred << "bytes" << std::endl;
        }
        else
        {
            std::cout << "Send failed..." << std::endl;
        }
    }
};

class Server
{
    asio::io_context &io_context_;
    tcp::acceptor acceptor_;
    tcp::socket socket_;
    std::string buff_;

public:
    Server(asio::io_context &io_context, const short port)
        : io_context_(io_context),
          acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
          socket_(io_context), buff_(){}

    // accept開始
    void start_accept()
    {
        acceptor_.async_accept(
            socket_,
            [this](const error_code_t &error)
            {
                accept_handler(error);
            });
    }

private:
    // accept完了ハンドラ
    void accept_handler(const error_code_t &error)
    {
        if (!error)
        {
            std::cout << "Accept and connection established !!!" << std::endl;
            start_receive();        // acceptが完了したので、recvを開始する。
        }
        else
        {
            std::cout << "Failed to establish connection..." << std::endl;
        }
    }

    // メッセージのrecv開始
    void start_receive()
    {
        buff_.clear();
        buff_.resize(1024);
        socket_.async_receive(
            asio::buffer(buff_.data(), buff_.size()),
            [this](const error_code_t &error, const size_t bytes_transferred)
            {
                receive_handler(error, bytes_transferred);
            });
    }

    // recv完了ハンドラ
    void receive_handler(const error_code_t &error, const size_t bytes_transferred)
    {
        if (!error)
        {
            std::cout << "Receive succeeded!!! Read " << bytes_transferred << " bytes." << std::endl;
            std::cout << "Recv data ===> " << buff_ << std::endl;
        }
        else
        {
            std::cout << "Receive failed..." << std::endl;
        }
    }
};

int main(int argc, const char **argv)
{
    if (argc < 2)
    {
        std::cout << "usage: ./tcp_sample port_number" << std::endl;
        return -1;
    }
    const int port = std::stoi(argv[1]);
    asio::io_context io_context;
    asio::signal_set sig(io_context,SIGINT);  //SIGINTで終了するため

    Server server(io_context, port);
    server.start_accept(); // accept開始

    Client client(io_context, port);
    client.start_connect(); // connect開始

    auto guard = asio::make_work_guard(io_context);   //io_contextの追加設定
    sig.async_wait([&guard](auto err,auto num){guard.reset();});
    auto th = std::async(std::launch::async, [&io_context]()
                         { io_context.run(); });

    th.get();
    return 0;
}

samplecode_tcp

少々長いコードですが、順番に見ていきます。 まず、ServerとClientの両方に、start_○○というメソッドがあるのに気付くでしょうか。これは各操作の開始メソッドです。これらのメソッドは呼び出された後すぐに返ります。そして、各startメソッドの中のasync_○○メソッドには、○○_handlerが渡されています。これは前項で紹介した完了ハンドラとなります。各startメソッドによって開始されたtcpの各操作が完了すると、それが非同期操作の完了イベントとなり、設定された完了ハンドラが呼び出されるという流れです。ここで、asio::io_contextは、Proactorとして働き、ハンドラの呼び出し等を制御します。 以上の流れを整理すると、以下のようになります。

Serverの処理の流れ

  1. start_accept()を呼び、connect要求に対する待機と要求に応じたacceptを非同期で実行開始
  2. 1.のacceptが完了、完了通知をio_contextに送信
  3. asio::io_contextが完了通知を元に、accept_handler()を呼び出し
  4. 3.によって、start_receive()が呼ばれ、クライアントからのsendの待機と送られたデータに対するrecvを非同期で実行開始
  5. 4.のrecvが完了、完了通知をio_contextに送信
  6. asio::io_contextが完了通知を元に、receive_handler()を呼び出し

上記のコードを見ると、各startメソッドは単にAsioライブラリの機能を呼んでいるだけで、ユーザーはtcpに関連する待機やその他の操作を直接実行していません。前項の説明通り、ユーザーが責任を持つのは完了ハンドラでの操作結果の扱いのみとなっています。

前項で紹介していない要素としては、connect_handler()内でのstart_send()の呼び出し、accept_handler()内でのstart_receive()の呼び出しです。これは少々実装テクニックに近い話ではありますが、このように完了ハンドラの中で連鎖させるように次の操作をスタートさせると、acceptが完了したかどうかなどの情報を"状態"として保持するような事をせずに、順序立てて操作を実行することができます。処理を永続的にループさせるなども、receive_handler()の中でstart_accept()を呼ぶなどすることで実現できます。

また、本コードは簡単のために最小限にしているので、このままでは複数のコネクションを同時に処理するような事はできません。しかし、上記のコードを少し書き換えれば同時に扱えるようになります。asioのサンプルコード(またはBoost.Asioのサンプルコード)に、上記に近い形で同時処理が可能なtcp通信のコードがあります。

Asioにおける"非同期"操作の実装について

前項のサンプルコードや説明中で、「ある操作が"非同期"になる」とか、「"非同期"に待機する」などAsioを使うと非同期で操作が実行できると紹介してきたものの、それがどのように実現されているのかについては述べてきませんでした。詳細については次回以降の記事で紹介しますが、ここで簡単に紹介します。
Asioにおける非同期操作は、実行したい対象の操作によって違いますが、ここでは一旦前項のサンプルコード内にもある、asio::ip::tcp::acceptor::async_accept()を例にします。このメソッドはacceptを実行するメソッドですので、主に以下の2つの処理をします。

  1. 相手からconnectを求められるまでの待機
  2. 要求が来てからの実際のaccept

これらの操作は、プラットフォームによって実装が変わります。以下のコード片を見て下さい。

#if defined(ASIO_WINDOWS_RUNTIME)
  detail::io_object_impl<
    detail::null_socket_service<Protocol>, Executor> impl_;
#elif defined(ASIO_HAS_IOCP)
  detail::io_object_impl<
    detail::win_iocp_socket_service<Protocol>, Executor> impl_;
#elif defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  detail::io_object_impl<
    detail::io_uring_socket_service<Protocol>, Executor> impl_;
#else
  detail::io_object_impl<
    detail::reactive_socket_service<Protocol>, Executor> impl_;
#endif

これは、basic_socket_acceptorクラスの中のifdefです。ややこしいテンプレートは一旦置いておいて、io_object_implというテンプレートの1引数目に渡されているクラスが重要です。 ここにある4種類は以下のようになっています。

  • null_socket_service
    • 対象の操作に対応していないプラットフォーム
  • win_iocp_socket_service
    • iocpに対応しているプラットフォーム (windows)
  • io_uring_socket_service
    • io_uringに対応しているプラットフォーム
  • reactive_socket_service
    • 上記に該当せず、そのプラットフォームにあるReactorを利用するべきプラットフォーム

最後のreactive_socket_serviceは使うReactorによって以下のように更に細分化されます。 (reactor.hppより)

#if defined(ASIO_HAS_IOCP) || defined(ASIO_WINDOWS_RUNTIME)
typedef null_reactor reactor;
#elif defined(ASIO_HAS_IO_URING_AS_DEFAULT)
typedef null_reactor reactor;
#elif defined(ASIO_HAS_EPOLL)
typedef epoll_reactor reactor;
#elif defined(ASIO_HAS_KQUEUE)
typedef kqueue_reactor reactor;
#elif defined(ASIO_HAS_DEV_POLL)
typedef dev_poll_reactor reactor;
#else
typedef select_reactor reactor;
#endif

上記を見ると、プラットフォームによってepoll、kqueue、/dev/poll、selectのどれかを使った実装になることが分かります。 ここで、reactive_socket_serviceかつselectを使うプラットフォームだとすると、1.の待機処理はselectによる監視が行われます。 そして、その待機処理と待機終了後のacceptはio_contextの実行メソッド、例えばio_cotext::run()等が呼ばれているスレッドで実行されます。 本記事のtcpの例だと、std::async()で作られるであろうスレッドにて実行されているはずです。

Asio(Boost.Asio)ライブラリのProactorパターンを活用したサンプルコード

前記事と合わせて、Asioで採用されている主な2つの非同期処理に関するデザインパターン Proactor・Reactorを見てきました。本章では一旦AsioライブラリのProactorパターンの機能の利用例をいくつか紹介します。先ほどの説明とあわせて、理解の一助になるかと思います。ちなみに、以下のコードは全てProactorパターンを利用していますが、AsioはReactorパターンを使った処理も可能です。

公式のドキュメントにも豊富なサンプルがあります。

また興味がある方は、以下の公式リファレンスもご覧下さい。

サンプルは全てAsioとBoost.Asioのどちらでも実行可能になっています。Boost版を利用したい型は、サンプル中のコメントを解除するか、コンパイル時にマクロを定義してBoost向けのソースを有効にしてください。

以降のサンプルコードは全て以下の環境で実行可能なことを確認しています。

  • OS: Ubuntu 22.04.5 LTS
  • Kernel: 5.15.123.1-microsoft-standard-WSL2
  • Asio 1.30.2
  • Boost.Asio 1.85

ソケットを使ったtcp通信

省略 Proactorの項のサンプルコードを参照

シグナルハンドリング

省略 (前記事)Asio(Boost.Asio)ライブラリのごく簡単なサンプルコードを参照

※ 余談ですが、多くの場合、例えばsignal.hやsignal関数を使ってシグナルハンドラを設定する場合、ハンドラ内でstd::printfやstd::coutなどのasync-signal-safeではない関数を呼んではいけないなど、様々な注意事項があります。ですので、筆者はasio::signal_setを利用する際も、基本的にシグナルを扱う際のプラクティスに従っています。しかしながら、Asioのasio::signal_setのリファレンスでは、それらのプラクティスに対して一切の言及がありませんし、だからといって「安全だから気にしなくてよい」といったことも書いていません。これは常識だということで言及が無いのか、はたまた実はそれらのプラクティスに従う必要が無いような実装になっているから書いていないのか...。

タイマーを利用した待機処理

以下のコードはタイマーを利用して、非同期で指定した時間待機する処理です。 async_wait()はノンブロッキングで、タイマーに指定した時間が経過すると、 設定されたハンドラを発行します。 以下では複数のタイマーが使われていますが、それぞれが持つメソッドは同じで、 参考にする時計の種類によって分けられています。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
    #include "boost/asio.hpp"
    using namespace boost;
#else
    #include "asio.hpp"
#endif // USE_BOOST_ASIO_LIB

#include <chrono>
#include <iostream>
#include <future>

int main(int argc, const char** argv) {
    using namespace std::literals::chrono_literals;
    asio::io_context io_context;

    asio::steady_timer steady_timer(io_context);                    //steady_clockに基づくタイマー
    steady_timer.expires_after(3s);                                 //3秒待つように設定
    steady_timer.async_wait([](const auto& err){std::cout << "Steady timer finished!!!" << std::endl;});

    asio::high_resolution_timer high_resolution_timer(io_context);  //high_resolution_clockに基づくタイマー
    high_resolution_timer.expires_after(4s);                        //4秒待つように設定
    high_resolution_timer.async_wait([](const auto& err){std::cout << "High resolution timer finished!!!" << std::endl;});
    
    asio::system_timer system_timer(io_context);                    //system_clockに基づくタイマー
    system_timer.expires_at(std::chrono::system_clock::now() + 5s); //現在時刻 + 5秒の時刻まで待つように設定
    system_timer.async_wait([](const auto& err){std::cout << "System timer finished!!!" << std::endl;});

    auto th = std::async(std::launch::async,[&io_context](){
        io_context.run();
    });
    th.get();
    return 0;
}

io_context(io_service)を利用した非同期処理

以下はAsioのコア機能となる、io_contextを利用した非同期処理のサンプルです。
Proactorパターンに従って複数のタスクを実行し、それらのタスクは終わる毎にハンドラを発行します。
各ハンドラはタスクが終了したことを出力するようになっているので、
タスクが終わった順番に標準出力に文字列が現れます。
SIGINTを送るとプログラムが終了します。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
    #include "boost/asio.hpp"
using namespace boost;
#else
    #include "asio.hpp"
#endif // USE_BOOST_ASIO_LIB

#include <chrono>
#include <iostream>
#include <future>

//実行に時間が掛かるタスク。タスクの終了時に、標準出力に出力するハンドラを発行する。
void heavy_task(asio::io_context &ctx_ref)
{
    using namespace std::literals::chrono_literals;
    std::thread([&ctx_ref]()
                {
            std::this_thread::sleep_for(10s);
            asio::post(ctx_ref,[](){std::cout << "Heavy task finished!!!" << std::endl;}); })
        .detach();
}

//すぐ終わるタスク。タスクの終了時に、標準出力に出力するハンドラを発行する。
void light_task(asio::io_context &ctx_ref, const int &number)
{
    using namespace std::literals::chrono_literals;
    std::thread([number,&ctx_ref]()
                {
        std::this_thread::sleep_for(500ms);
        asio::post(ctx_ref,[number](){std::cout << "Light task number " << number << " finished!!!" << std::endl;}); })
        .detach();
}

int main(int argc, char const *argv[])
{
    using namespace std::literals::chrono_literals;
    asio::io_context io_context;
    asio::signal_set sig(io_context,SIGINT);    //SIGINTをキャッチするよう設定

    for (int i = 0; i < 5; i++)
    {
        light_task(io_context, i);
        std::cout << "light_task number " << i << " posted..." << std::endl;
        std::this_thread::sleep_for(100ms);
    }
    heavy_task(io_context);
    std::cout << "heavy_task posted.. " << std::endl;
    auto guard = asio::make_work_guard(io_context);  //Executor(io_context)のための追加設定
    sig.async_wait([&guard](auto err,auto num){guard.reset();});  //SIGINTをキャッチするとプログラム終了

    io_context.run();

    std::cout << "\nFinish loop..." << std::endl;
    return 0;
}

おわりに

今回はProactorパターンについて紹介しました。筆者としてはAsioのProactorパターンの機能を利用する機会の方が多いです。やはり、こちらが明示的にロック等を使わないで、データの同期等を気にせず操作完了後の結果を扱えるのは非常に楽です。 前記事とあわせてAsioのサンプルとして、シグナルハンドリング・tcp通信・タイマーなどを紹介しましたが、Asioには他にもいくつか具体的なI/O操作が実装されています。SSLやシリアル通信、unix domein socket、windowsの"ハンドル"を扱うための機能もあります。筆者が頻繁に利用しているのは、シリアル通信の機能です。RS485やRS232などに従った通信が、本記事で紹介したtcp通信の機能と同じようなインターフェースで利用できるので、電子工作やロボットを動かすのに重宝しています。

参考文献


VA Linux は、千葉工業大学 未来ロボティクス学科のロボカップチーム「CIT Brains」をスポンサーとして応援しており、インターン生も数名受け入れています。
本記事は、その一環としてインターン生の井上さんが記事を執筆し、弊社が監修を行いました。

*1:これはもちろん様々な例外があります。Reactorも複雑なシステムを実装しているうちに、Proactorに似たような動作になる事があります。