黃爸爸狗園

本園只有sanitizer,沒有狗籠

0%

boost::asio + boost::fiber之上下文切換機制

最近剛好在研究boost::asio,在研究官方範例的時候發現與fiber連動的code,感覺有點玄妙,來寫篇筆記

Autoecho

  • autoecho.cpp
    • 這個範例基本上就是在同一個process內開一個tcp server跟數個client,client會發送一個字串給server,然後server會把該字串echo回去
    • 乍看之下蠻無聊的,但關鍵是這個程式只有一條thread在工作,所有的concurrency都是靠fiber在做
    • 萬事起頭難,我們先來很快地審視過main()在幹嘛
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int main( int argc, char* argv[]) {
try {
//[asio_rr_setup
std::shared_ptr< boost::asio::io_context > io_ctx = std::make_shared< boost::asio::io_context >();
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ctx);
//]
print( "Thread ", thread_names.lookup(), ": started");
//[asio_rr_launch_fibers
// server
tcp::acceptor a( * io_ctx, tcp::endpoint( tcp::v4(), 9999) );
boost::fibers::fiber( server, io_ctx, std::ref( a) ).detach();
// client
const unsigned iterations = 2;
const unsigned clients = 3;
boost::fibers::barrier b( clients);
for ( unsigned i = 0; i < clients; ++i) {
boost::fibers::fiber(
client, io_ctx, std::ref( a), std::ref( b), iterations).detach();
}
//]
//[asio_rr_run
io_ctx->run();
//]
print( tag(), ": io_context returned");
print( "Thread ", thread_names.lookup(), ": stopping");
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
print("Exception: ", e.what(), "\n");
}
return EXIT_FAILURE;
}
  • 一開始先初始化io_context與設定fiber排成所用的algo為round robin,這沒有太大的問題
  • 再來是把server fiber跟client fiber都建立好,detach()出來
  • 最後是io_ctx->run()讓目前主fiber把執行權讓出來,排程器就會抓剛剛排入的fiber出來跑

async_something()的奧秘

  • 我們可以從server()來切入問題
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void server( std::shared_ptr< boost::asio::io_context > const& io_ctx, tcp::acceptor & a) {
print( tag(), ": echo-server started");
try {
for (;;) {
socket_ptr socket( new tcp::socket( * io_ctx) );
boost::system::error_code ec;
a.async_accept(
* socket,
boost::fibers::asio::yield[ec]);
if ( ec) {
throw boost::system::system_error( ec); //some other error
} else {
boost::fibers::fiber( session, socket).detach();
}
}
} catch ( std::exception const& ex) {
print( tag(), ": caught exception : ", ex.what());
}
io_ctx->stop();
print( tag(), ": echo-server stopped");
}
  • 我們的server fiber實體長這樣,但實際上只有做一個accept()動作而已
  • 所以server()幹了啥?
    • 使用我們在main裏頭準備好的acceptor a去等待client的連線,async_accept()
      • 喔齁!?
    • 來看一下async_accept()的signature,看看他是何方神聖
      1
      2
      3
      4
      5
      6
      7
      8
      template<
      typename Protocol1,
      typename Executor1,
      typename AcceptToken = DEFAULT>
      DEDUCED async_accept(
      basic_socket< Protocol1, Executor1 > & peer,
      AcceptToken && token = DEFAULT,
      typename constraint< is_convertible< Protocol, Protocol1 >::value >::type = 0);
    • 所以async_accept()要一個socket跟一個AcceptToken
      • socket沒問題,重點是那個AcceptToken到底是什麼咚咚?
      • 簡單來說AcceptToken可以是一個callable,在未來的某一個時間點真的accept到東西的時候,接收來自kernel的error code來做後續處理
        • 就是async_accept()的callback拉
      • 但很顯然的,這個例子不是這樣
        1
        a.async_accept(* socket, boost::fibers::asio::yield[ec]);
      • 媽的貢丸,boost::fibers::asio::yield[ec]是啥玩意?
        • 持續trace code,找到其定義
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          namespace boost {
          namespace fibers {
          namespace asio {
          class yield_t {
          public:
          yield_t() = default;

          yield_t operator[]( boost::system::error_code & ec) const {
          yield_t tmp;
          tmp.ec_ = & ec;
          return tmp;
          }

          boost::system::error_code * ec_{ nullptr };
          };

          thread_local yield_t yield{};
          }}}
      • 好樣的,我們觸發了他的operator[],具體只是把我們server()中的ec的addr綁到位於thread_local的yield上
      • 然後作為AcceptToken傳遞給async_accept(),我的老天鵝,yield_t又沒有operator(),到底是怎麼樣變出callback的?
      • 讓我們繼續往更深層跳......
      • 我們一路step in,會看到一個關鍵位置
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        typedef typename boost::asio::async_result<
        typename decay<CompletionToken>::type,
        Signature>::completion_handler_type completion_handler_type;
        /*一些其他的code*/
        explicit async_completion(CompletionToken& token)
        : completion_handler(static_cast<typename conditional<
        is_same<CompletionToken, completion_handler_type>::value,
        completion_handler_type&, CompletionToken&&>::type>(token)),
        result(completion_handler)
        {
        }
      • 哇,酷喔,因為我們給的AcceptToken是yield_t,導致上面的completion_handler_type被推導成async_result< boost::fibers::asio::yield_t, void(boost::system::error_code) >::completion_handler_type
      • 還真的有一個partial specialization長這樣
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        template<>
        class async_result< boost::fibers::asio::yield_t, void(boost::system::error_code) > :
        public boost::fibers::asio::detail::async_result_base {
        public:
        using return_type = void;
        using completion_handler_type = fibers::asio::detail::yield_handler<void>;

        explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h):
        boost::fibers::asio::detail::async_result_base{ h } {}
        };
        • 你,看到了嗎.....?那個潛伏在陰暗角落,蠢蠢欲動的completion_handler_type
      • 講了那麼多,最後發現傳入yield_t之後,completion_handler最後會初始化成一個yield_handler
      • 這個咚咚可就厲害了,它可是有operator()的!!
        • 但不要忘記,他還有個base class,一樣也有一個operator()
        • 先來看一下yield_handler的operator()
          1
          2
          3
          4
          5
          6
          7
          8
          void operator()(boost::system::error_code const& ec, T t) {
          BOOST_ASSERT_MSG( value_,
          "Must inject value ptr "
          "before caling yield_handler<T>::operator()()");

          * value_ = std::move( t);
          yield_handler_base::operator()(ec);
          }
        • 我們可以看到value_(就是我們的socket),會被assign t,這個t就是當async_accept()完成之後會得到的socket
        • 接下來就會繼續call yield_handler_base的operator()
        • 所以我們現在知道,AcceptToken給yield_t,最後async_accept()完事之後會呼叫的callback來自這裡
      • 我們接著來看yield_handler_base的operator()做了啥?
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        class yield_handler_base {
        public:
        yield_handler_base( yield_t const& y) :
        ctx_{ boost::fibers::context::active() },
        yt_( y ) {}

        void operator()( boost::system::error_code const& ec) {
        BOOST_ASSERT_MSG( ycomp_,
        "Must inject yield_completion* "
        "before calling yield_handler_base::operator()()");
        BOOST_ASSERT_MSG( yt_.ec_,
        "Must inject boost::system::error_code* "
        "before calling yield_handler_base::operator()()");

        yield_completion::lock_t lk{ ycomp_->mtx_ };
        yield_completion::state_t state = ycomp_->state_;

        ycomp_->state_ = yield_completion::complete;

        * yt_.ec_ = ec;
        lk.unlock();
        if ( yield_completion::waiting == state) {
        // wake the fiber
        fibers::context::active()->schedule( ctx_);
        }
        }

        boost::fibers::context * ctx_;
        yield_t yt_;
        yield_completion::ptr_t ycomp_{};
        };
      • 兩個重點
        • * yt_.ec_ = ec;會把真正的error code寫回我們給的yield_t
        • yield_completion::waiting == state如果我們的server fiber在async_accept()之後沒有馬上就accept到連線(正常來說都是這樣),那麼這個state就會是waiting,我們此時就會把server fiber的context排進排程裏頭,讓他盡快被跑起來
      • 跑起來之後就會把已經accept的socket交遊session fiber去執行工作
    • 但還有一個問題,我們在呼叫async_accept()之後,在我們正式accept到連線之前的這段時間內,執行權在誰手上
      • async_開頭的function都是non-blocking的,所以不會卡在原地
      • 但看起來也不像是回到server fiber......
      • 我們可以在往下看,我們的async_accept()事實上背後接著一個async_initiate(),這個東西內部有一個玄機
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        template <typename CompletionToken,
        BOOST_ASIO_COMPLETION_SIGNATURE Signature,
        typename Initiation, typename... Args>
        inline typename enable_if<
        !detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
        BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
        async_initiate(BOOST_ASIO_MOVE_ARG(Initiation) initiation,
        BOOST_ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
        BOOST_ASIO_MOVE_ARG(Args)... args)
        {
        async_completion<CompletionToken, Signature> completion(token);

        BOOST_ASIO_MOVE_CAST(Initiation)(initiation)(
        BOOST_ASIO_MOVE_CAST(BOOST_ASIO_HANDLER_TYPE(CompletionToken,
        Signature))(completion.completion_handler),
        BOOST_ASIO_MOVE_CAST(Args)(args)...);

        return completion.result.get();
        }
      • 最後的那個completion.result.get()是關鍵,我們一個step進去看一看
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        void get() {
        ycomp_->wait();
        if ( ec_) {
        throw_exception( boost::system::system_error{ ec_ } );
        }
        }

        /*再來直接看ycomp->wait()*/
        void wait() {
        lock_t lk{ mtx_ };
        if ( complete != state_) {
        state_ = waiting;
        fibers::context::active()->suspend( lk); // !!
        }
        }
      • 不看不知道,一看不得了,上面這段code說了啥?
        • 假設我們呼叫get()的時候,ycomp的state不是complete(換言之,callback還沒有被跑過),那基本上進去wait()就會觸發fibers::context::active()->suspend()
        • 這個動作相當於是直接yield控制權,換言之,我們的async_accept()在呼叫後如果沒有馬上accept到連線的話,就會把控制權yield出來給其他fiber!!
    • 一旦async_accept()讓出執行權後,在他完成之前,就會有其他的fiber被拉起來跑(client or session),而當這兩個fiber各自也執行async action的時候,就會如同上面async_accept()一樣讓出執行權
      • 如此我們就能只靠一條thread達到concurrency了

延伸閱讀

結論

想不到吧,Alice居然就是Bob!