Asio (Boost.Asio) C++ライブラリ入門 1 -Reactor-

執筆者:井上 叡
監修者:稲葉 貴昭・高橋 浩和

※ 「Asio (Boost.Asio) C++ライブラリ入門」連載記事一覧はこちら


はじめに

本記事では、非同期I/Oを扱うC++ライブラリであるAsio(またはBoost.Asio)ライブラリを紹介・解説します。Asio(またはBoost.Asio)ライブラリは、非同期I/Oを行う際に利用候補に挙がることがありますが、初めて使うと意外と仕組みが理解しづらく少し苦労したりします。そこで、本連載では、Asio(またはBoost.Asio)ライブラリが採用する非同期モデルやコア機能など、ライブラリの基礎となる部分を中心に解説していきます。解説の際にはなるべくサンプルコードを掲載し、具体例と共に理解していけたらと思います。

1記事目となる本記事では、Asio(またはBoost.Asio)ライブラリの簡単な導入と、ライブラリで採用されている2つの非同期デザインパターンのうち1つであるReactorというデザインパターンについて紹介します。

Asio(Boost.Asio)ライブラリとは

Asio (またはBoost.Asio)はC++ライブラリであり、「非同期I/O(Async I/O)」などの、完了までに"待ち"が発生する操作を管理するための様々な機能が実装されています。また、それらの機能を応用して、TCP通信やシグナルハンドリングなどの具体的なI/O操作もいくつか実装されています。ユーザーはそれらの機能を活用して、スレッドや明示的ロックに基づいた非同期モデルを自分で実装する事なく、様々な長時間の操作を管理することが可能になります(もちろん、同期的な操作も管理できます)。マルチプラットフォーム*1にも対応しています。

また、公式サイトでは、Asio(またはBoost.Asio)が提供する機能として以下の図のように図解されています。 本記事では、主に図中の Core と表記されている部分について紹介・解説をしていきます。

Asio(またはBoost.Asio)の機能 (https://think-async.com/Asio/ より引用)

C++標準ライブラリはC++11以降、<thread><coroutine>など、非同期機能が徐々に拡充しつつありますが、特に非同期I/Oに関連する機能は充分とは言えない状況であり、Asio(またはBoost.Asio)が利用されることがあります*2。高度な非同期処理を実装するための利用はもちろんですが、単にソケット通信やシリアル通信をするためだけ、などの単純な用途にも利用されることがあります。

タイトルを含め本章では「Asio(またはBoost.Asio)」という記載をしていますが、これはAsioライブラリと同一の実装(ほぼ名前空間のみの違い) *3Boost C++ Librariesのモジュールの1つとして存在するからです。これ以降の章では、特に注記なく「Asio」と記載した際には、その内容がAsioライブラリとBoost版であるBoost.Asioライブラリの両方について述べられているものとします。

また、本記事で扱うライブラリのバージョンは、Boost 1.85とAsio1.30.2です。

AsioまたはBoost.Asioライブラリのインストール方法

必要とされる方がいらっしゃるかは分かりませんが、念のため本記事で扱うライブラリのインストール方法を記載しておきます。基本的には、一般的なC++ライブラリのインストール方法の作法通りに手順を実行すれば、簡単にインストールすることができます。

必要が無い方は本章は読み飛ばして頂いて構いません。

Asio 1.30.2 のインストール方法

Asioライブラリは基本的にヘッダーオンリーライブラリであるため、単にダウンロードして配置するのみとなります。ヘッダを独自の場所にインストールしたい場合は、3,4を実行せず、コピー等して配置するか、make installの際などにディレクトリを指定して下さい。

  1. wget -O asio1_30_2.zip https://sourceforge.net/projects/asio/files/asio/1.30.2%20%28Stable%29/asio-1.30.2.zip/download
  2. unzip asio1_30_2.zip
    • ダウンロードしたものを展開
  3. ./configure
    • 展開したディレクトリ内にある、configureファイルを実行
  4. make install
    • 3.でMakefileが生成されるので、make installする

Boost.Asio 1.85.0のインストール方法

Boost版は少しビルドが必要です。

  1. wget https://archives.boost.io/release/1.85.0/source/boost_1_85_0.tar.gz
    • 公式ページにある、DLリンクからファイルを好みの形式で取得する
  2. tar -xvzf boost_1_85_0.tar.gz
    • ファイルを展開
  3. ./bootstrap.sh
    • 展開したディレクトリに含まれる、bootstrap.shを実行
  4. ./b2 install
    • コンパイルしてインストール
    • 独自の場所に配置したい場合などは、./b2 --helpで確認できるヘルプを参考に、インストールディレクトリを指定して下さい。

Asio(Boost.Asio)ライブラリのごく簡単なサンプルコード

いきなりライブラリの非同期モデルやコア機能について滔々と述べられてもイメージが湧きませんので、ここでは1つ具体的なコードを紹介します。

以下のコードは、Asioを利用してシグナルハンドリングを行うサンプルです。 本プログラムはSIGINTを受け取ると、ループを終了してメッセージを表示します。

// #define USE_BOOST_ASIO_LIB //Boost版を利用する場合はコメント解除orコンパイル時にマクロ定義

#ifdef USE_BOOST_ASIO_LIB
#include "boost/asio.hpp"
using namespace boost;
#else
#include "asio.hpp"
#endif // USE_BOOST_ASIO_LIB

#include <atomic>
#include <iostream>
#include <chrono>
#include <future>

static volatile std::atomic_bool is_loop_finished(false);    // ループの終了判定をする変数

void handler(const asio::error_code &err, [[maybe_unused]] int signal_num)
{
    if (!err)
    {
        is_loop_finished.store(true);               // シグナルを適切に受け取ったら終了変数をtrueにする
    }
}

int main(int argc, const char **argv)
{
    asio::io_context io_context;                    // Asioの基礎となるクラス
    asio::signal_set signal(io_context, SIGINT);    // SIGINTをハンドルするように設定
    signal.async_wait(handler);                     // シグナルハンドラを設定し、非同期でシグナルが来るのを待機

    auto th = std::async(std::launch::async, [&io_context]()
                         { io_context.run(); });    // io_contextを起動
    while (!is_loop_finished.load())                // シグナルを受け取るまで無限ループ
    {
        std::cout << "Loop continue..." << std::endl;
        using namespace std::literals::chrono_literals;
        std::this_thread::sleep_for(1s);
    }
    std::cout << "Loop finished!!!" << std::endl;
    th.get();
    return 0;
}

samplecode_signal

上記のシグナルハンドリングに関する部分は、メソッド名やコメントを見ればそれぞれが大体どのような動きをするかはお分かりになるかと思います。signal(io_context,SIGINT)でSIGINTをキャッチするクラスをコンストラクトし、signal.async_wait(handler)でハンドラを設定した後に非同期でシグナルを待つ。その後は、whileで無限に待つというものです。 しかし、asio::io_contextというクラスは中々役割が想像しにくいのではないでしょうか。実はこのクラスこそがAsioライブラリの中核となるExecutorという機能です。ここでは、io_contextがこの後紹介するProactorパターンに従って、signalに設定されたハンドラの起動を制御しています。具体的に上の例ではおおまかに以下のようなステップでハンドラが起動されます。

  1. signalが非同期でシグナルを待つ
  2. signalはシグナルを受け取ると、シグナルの受取完了通知、受け取ったシグナルの情報、ハンドラをExecutorであるio_contextに渡す
  3. io_contextは2.で受け取った情報を基にハンドラを起動する

ExecutorやProactorなど、いきなりいくつか名前が出てきましたが、今の時点では理解していなくて問題ありません。具体例を見たところで、次に進みましょう。

非同期デザインパターン ProactorとReactor <1>

Asioでは、ProactorとReactorという2つのデザインパターンが重要な役割を果たしています。これらは、どちらもイベント処理を行うためのもので、I/Oに関連するイベントの取り扱いに役立ちます。本記事ではまずReactorについて紹介します。

Reactor

まずは、Reactorについて紹介します。多くのAsioの利用者が主に利用するのは恐らくProactorですが、Reactorを先に知る方が理解しやすいため、こちらを最初に扱います。

Reactorは非同期イベント処理のための設計パターンで、I/O操作を効率よく管理するためによく利用されます。例として、多数のクライアントからログを受け付けてロギングを行い、結果はプリンタで印刷したり、データベースに保存したりするサーバーがあるとします。

イメージ図は以下の通りです。

Schmidt, D.C. (1995). Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events. より引用

このサーバーはログの受け取りやプリンターへの印刷指示などをネットワーク経由でtcp通信で行うとします。多くのクライアントから頻繁にログを受け取り、しかもデータベースへの保存も頻繁に行わなければいけない場合、プログラムとしては複数のコネクションを同時に上手く取り扱いたいと思うでしょう。 複数のソケットを同時に処理する単純な手段としては、1つのtcpコネクション毎に1つのスレッドを生成して処理を行うというものが考えられます。しかしこれはマルチスレッドの煩雑な処理を実装しなくてはいけませんし、コンテキストスイッチやスレッド生成などに関する性能の問題もあります。
Reactorはこの問題に対処するために利用される設計手法の1つです。

まず、このサーバーが行うtcp通信に関連する操作を主に以下の2つとします。

  • クライアントからのコネクション要求を処理する accept 操作
  • クライアントからのログデータを受信する recv 操作

これら2つの操作は、サーバーが処理を行うよりも先にまずはクライアントが何らかの要求をする事によって発生するものです。具体的には、クライアントのconnect要求によってサーバーのacceptが発生、クライアントのsendによってサーバーがrecvを実行する必要が生じます。しかし、マルチスレッドを使わない、つまりシングルスレッドで上記を処理しようとした場合、以下のような操作を順番に行っていく事になると思います。

サーバーの動作

  1. クライアントからのコネクション要求を待機する
  2. コネクション要求を受け取り、コネクションの確立を試みる
  3. クライアント側のログデータのsend実行を待つ
  4. sendされたデータを受信する

ここで、例えばサーバーが複数のポートをログ受信ポートとして持っていて、シングルスレッドで上記を実行するとします。するとどこかの1つのポートにコネクション要求が来ない場合などは、1.の状態でブロッキングされて全ての動作が停止してしまいます。2.の状態は多少の待ちが発生するものの無限にブロッキングする可能性は低いのに対して、1.は無限にブロッキングしてしまう危険があります。これは、3.と4.も同様の関係にあります。
しかし、1.と3.の待ちの処理が、複数のポートに対して同時に行えるようになったらどうでしょうか?すると、例えどこかのポートにコネクション要求が来なくても、どこかのポートに対して何故かクライアントがデータのsendをしてくれなくても、あくまでそのポートの動作が停止するだけで他のポートは問題なく動く事ができるようになります。

この、上記の1.と3.に当たる部分を多重化(multiplexing) つまり「同時に待機する」ようにして、シングルスレッドでも複数のポートに対して上記の処理を問題無く実行できるようにするのが、Reactorです。

※ 念のため書いておくと、多重化するのはあくまで1.と3.のみです。そのため、待機が終わって実行される3.と4.の動作は同期的に、つまり1つ1つ順番に実行されます。下の図中のacceptハンドラとrecvハンドラは同時に実行されることはありません。

以下に、上例のロギングサーバーのReactorの構造を図示します。

ロギングサーバーにおけるReactorの概念図

上図を見ると、Reactorパターンはイベント処理のパターンだと以前に述べた理由が分かるかもしれません。ここでは、クライアントからのsendconnectはイベントです。そして、それらに対するイベントハンドラが、acceptハンドラ、recvハンドラになります。多重化器は、複数のポートからのイベント(send、connect)に対して待機し、発生したイベントに応じて、適切なハンドラを呼び出すことで処理を多重化しています。また、recvハンドラは常に多重化器で待機しているわけではありません。recvはacceptが完了した後でのみ実行される可能性があるため、acceptハンドラは自信が適切に実行されると対象のソケットに対するrecvハンドラを発行し、これを多重化器に渡します。

以上に示したようなReactorの設計を利用すると、多重化器をシングルスレッドで実行することでシングルスレッドで複数のポートに対する待ちが実現できるようになります。しかしながら、そのような都合の良い多重化器は本当に実装できるのでしょうか?そして、もし実装できるにしても非常に面倒なのではないでしょうか?次の項目では、selectというlinuxシステムコールを利用して、簡単なReactorの実例を見ていきます。

Reactorの実例 selectシステムコール

実装が大変そうな多重化器ですが、実はlinuxのシステムコールとして提供されています。それが、selectシステムコールです*4。次は、このselectを使ったサンプルコードを例にして、Reactorが実際にコードになるとどのようなものなのかを見ていきます。

>>>selectの簡単な紹介

selectは、そのmanによると

プログラムで複数のファイルディスクリプターを監視し、 一つ以上のファイルディスクリプターがある種の I/O 操作の 「ready (準備ができた)」状態 (例えば、読み込み可能になった状態) になるまで待つことができる。

とあります。これはソケット通信の場合、複数のソケットを同時に監視して、その中のどれかがaccept、readなどがすぐに(blockingせず)実行できる状態になるまで待つことができる、ということです。

また、以下が関数のシグネチャです。中でも今回のサンプルコードに関連する引数はreadfdsで、ここに監視対象のファイルディスクリプタの集合を渡して、selectに監視してもらいます。
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

selectに関するより詳しい情報は、man selectなどをご確認下さい。

以下にlinuxのselectシステムコールに関するサンプルコードを示します。以下のコードはselect_tut(2)のmanよりTCPポートフォワードプログラムの一部を抜粋して引用しています。当然このままで動きませんので、実行したいという方は man select_tutや、(manpages.org)https://ja.manpages.org/select/2などから完全なソースを入手してください。

/* ~~~ 省略1 ~~~ */

static int
connect_socket(int connect_port, char *address)
{
    struct sockaddr_in a;
    int s;
    s = socket(AF_INET, SOCK_STREAM, 0);
    if (s == -1) {
        perror("socket");
        close(s);
        return -1;
    }
    memset(&a, 0, sizeof(a));
    a.sin_port = htons(connect_port);
    a.sin_family = AF_INET;
    if (!inet_aton(address, (struct in_addr *) &a.sin_addr.s_addr)) {
        perror("bad IP address format");
        close(s);
        return -1;
    }
    if (connect(s, (struct sockaddr *) &a, sizeof(a)) == -1) {
        perror("connect()");
        shutdown(s, SHUT_RDWR);
        close(s);
        return -1;
    }
    return s;
}

/* ~~~ 省略2 ~~~ */

int
main(int argc, char *argv[])
{
    int h;
    int fd1 = -1, fd2 = -1;
    char buf1[BUF_SIZE], buf2[BUF_SIZE];
    int buf1_avail, buf1_written;
    int buf2_avail, buf2_written;
    if (argc != 4) {
        fprintf(stderr, "Usage\n\tfwd <listen-port> "
                 "<forward-to-port> <forward-to-ip-address>\n");
        exit(EXIT_FAILURE);
    }
    signal(SIGPIPE, SIG_IGN);
    forward_port = atoi(argv[2]);
    h = listen_socket(atoi(argv[1]));
    if (h == -1)
        exit(EXIT_FAILURE);
    for (;;) {
        int r, nfds = 0;
        fd_set rd, wr, er;

/* ~~~ 省略3 ~~~ */

        r = select(nfds + 1, &rd, &wr, &er, NULL);
        if (r == -1 && errno == EINTR)
            continue;
        if (r == -1) {
            perror("select()");
            exit(EXIT_FAILURE);
        }
        if (FD_ISSET(h, &rd)) {
            unsigned int l;
            struct sockaddr_in client_address;
            memset(&client_address, 0, l = sizeof(client_address));
            r = accept(h, (struct sockaddr *) &client_address, &l);
            if (r == -1) {
                perror("accept()");
            } else {
                SHUT_FD1;
                SHUT_FD2;
                buf1_avail = buf1_written = 0;
                buf2_avail = buf2_written = 0;
                fd1 = r;
                fd2 = connect_socket(forward_port, argv[3]);
                if (fd2 == -1)
                    SHUT_FD1;
                else
                    printf("connect from %s\n",
                            inet_ntoa(client_address.sin_addr));
            }
        }

/* ~~~ 省略4 ~~~ */

        if (fd1 > 0)
            if (FD_ISSET(fd1, &rd)) {
                r = read(fd1, buf1 + buf1_avail,
                          BUF_SIZE - buf1_avail);
                if (r < 1)
                    SHUT_FD1;
                else
                    buf1_avail += r;
            }
        if (fd2 > 0)
            if (FD_ISSET(fd2, &rd)) {
                r = read(fd2, buf2 + buf2_avail,
                          BUF_SIZE - buf2_avail);
                if (r < 1)
                    SHUT_FD2;
                else
                    buf2_avail += r;
            }
        if (fd1 > 0)
            if (FD_ISSET(fd1, &wr)) {
                r = write(fd1, buf2 + buf2_written,
                           buf2_avail - buf2_written);
                if (r < 1)
                    SHUT_FD1;
                else
                    buf2_written += r;
            }
        if (fd2 > 0)
            if (FD_ISSET(fd2, &wr)) {
                r = write(fd2, buf1 + buf1_written,
                           buf1_avail - buf1_written);
                if (r < 1)
                    SHUT_FD2;
                else
                    buf1_written += r;
            }

/* ~~~ 省略5 ~~~ */

    }
    exit(EXIT_SUCCESS);
}

man select_tut(2) より一部抜粋して引用

>>>上記サンプルコードについての簡易的な解説

上記は少し長いプログラムですが、順番に見ていきます。 まず、main関数は「省略2」のすぐ下から始まります。for(;;)によるループに入る前に、ソケットのlistenを宣言しています。次に、ここでの主題となるselectを呼んでいる箇所ですが、「省略3」のすぐ下にあります。このselectが戻ったらまずは戻り値を確認し、エラー等が無いことを確認したのちにif (FD_ISSET(h, &rd))で監視対象のソケットが準備完了になったかを確認しています。準備完了になっていた場合はacceptが呼ばれ、それが成功したらポートフォーワード先にconnectしています。これらの操作が全て成功した場合、次は「省略4」以下のreadwriteの待機も開始されます。これらのreadwriteの待機もselectによって多重化され、準備完了した後に読み取りや送信が実行されます。

上記のサンプルを見て、どの処理が非同期で行われていて(多重化されていて)、どの処理が同期的に行われていたのか分かったでしょうか。 select以降に実行されたシステムコールは、全て順番に(同期的に)実行されています。まとめると、以下のようになります。

非同期的に行われた(多重化された)処理

  • 複数のソケットが同時に、他所からのconnect要求を待つ処理
  • 通信相手に対するreadwriteの待機処理 (connectが完了した後に待機開始)

同期的に行われた処理

  • 受け取ったconnct要求に対して、行ったaccept
  • ポートフォーワード先へのconnect
  • readwrite

前項で紹介したイベントハンドラに相当するものの1つは、connect_socket関数です。そして、イベントに相当するものの1つは、クライアントからのconnectになります。「省略3」から「省略4」までの部分は、selectによってクライアントからのconnectを待つ処理が多重化され、その結果によってconnect_socketが呼び出されるという形になっています。まとめて書かれているため少々分かりにくいですが、イベントハンドラであるconnect_socketが呼び出されると、「省略4」以降の部分のread()/write()が呼ばれる可能性があるようになります。これは、前に紹介した「acceptハンドラがrecvハンドラを発行する」という動作と同じです。ここまで来ると、クライアントからのconnectを待つだけでなく、read()/write()なども多重化して同時に待機するようになります。

また、上記のコードはシングルスレッドで書かれていますが、色々な操作を非同期で待つことが出来ています。これはまさに、「Reactorパターンを使うとユーザーはマルチスレッドを使わずに非同期の待機処理ができる」ということです。

おわりに

今回は、Asioの導入部分からReactorパターンまでを紹介しました。Reactorパターン自体は認識していなくても、selectやpollを利用したことある方は多いと思いますので、今回の解説は感覚的に知っていたという方も多いかもしれません。次回は、Reactorパターンと比較しながら、Proactorパターンについて解説していきます。余談ですが、Asioにとって重要なProactorパターンですが、他のソフトウェアを見るとReactorパターンがメインのものが多いようです。Reactorパターンを使っている有名なソフトウェアとしては、node.jsやnginxがあるようです。また、rustのtokioライブラリにもモジュールとしてreactorがあります。しかし、Proactorパターンを使っているライブラリを調べるとあまり有名なものが出てきません。もしかすると、最も有名なのはAsio(Boost.Asio)かもしれません...。

参考文献


VA Linux は、千葉工業大学 未来ロボティクス学科のロボカップチーム「CIT Brains」をスポンサーとして応援しており、インターン生も数名受け入れています。
本記事は、その一環としてインターン生の井上さんが記事を執筆し、弊社が監修を行いました。

*1:公式ページによると、あまり積極的にテストはされていないが、Android、NetBSD、Solaris、AIX、HP-UX等でも"多分"動くよと書いてあります。実際、ソースコードを確認するとそれらのプラットフォームに向けたであろうifdefを散見します。

*2:ちなみに将来的にC++の標準ライブラリに、ネットワーク等に関する機能のライブラリを入れようという提案は以前から存在しているようです。それの実装として、Asio(またはBoost.Asio)を利用するという提案が数年前から出されていますが、そうではなく新たに実装するべきだ、など色々意見があるようで、議論が紛糾している模様です。一時はC++23にネットワーク機能が入るのではという話もあり、筆者も楽しみにしていたのですが、結局実現しませんでした。

*3:名前空間以外の違いは、いくつかのマクロのprefixへの"BOOST_"の有無、いくつかの関数が返すエラーコードがBoost.Systemと標準のstd::error_code等のどちらを利用するか、Boost版はビルドの設定によってはBoost.Thread等へのリンクが必要になる事があること、などです。

*4:selectと同様に複数ファイルディスクリプタでのイベントを待機するためのシステムコールとして、pollやlinux限定ですが高効率版のepollというものもあります。また、それらのシグナルを同時に扱うバージョンとして、pselect、ppollというものもあります。