最近剛好在研究boost::asio,在研究官方範例的時候發現與fiber連動的code,感覺有點玄妙,來寫篇筆記
Autoecho
- autoecho.cpp
- 這個範例基本上就是在同一個process內開一個tcp server跟數個client,client會發送一個字串給server,然後server會把該字串echo回去
- 乍看之下蠻無聊的,但關鍵是這個程式只有一條thread在工作,所有的concurrency都是靠fiber在做
- 萬事起頭難,我們先來很快地審視過main()在幹嘛
1 | int main( int argc, char* argv[]) { |
- 一開始先初始化io_context與設定fiber排成所用的algo為round robin,這沒有太大的問題
- 再來是把server fiber跟client fiber都建立好,detach()出來
- 最後是io_ctx->run()讓目前主fiber把執行權讓出來,排程器就會抓剛剛排入的fiber出來跑
async_something()的奧秘
- 我們可以從server()來切入問題
1 | void server( std::shared_ptr< boost::asio::io_context > const& io_ctx, tcp::acceptor & a) { |
- 我們的server fiber實體長這樣,但實際上只有做一個accept()動作而已
- 所以server()幹了啥?
- 使用我們在main裏頭準備好的acceptor
a去等待client的連線,async_accept()
- 喔齁!?
- 來看一下async_accept()的signature,看看他是何方神聖
1
2
3
4
5
6
7
8template<
typename Protocol1,
typename Executor1,
typename AcceptToken = DEFAULT>
DEDUCED async_accept(
basic_socket< Protocol1, Executor1 > & peer,
AcceptToken && token = DEFAULT,
typename constraint< is_convertible< Protocol, Protocol1 >::value >::type = 0); - 所以async_accept()要一個socket跟一個AcceptToken
- socket沒問題,重點是那個AcceptToken到底是什麼咚咚?
- 簡單來說AcceptToken可以是一個callable,在未來的某一個時間點真的accept到東西的時候,接收來自kernel的error
code來做後續處理
- 就是async_accept()的callback拉
- 但很顯然的,這個例子不是這樣
1
a.async_accept(* socket, boost::fibers::asio::yield[ec]);
- 媽的貢丸,boost::fibers::asio::yield[ec]是啥玩意?
- 持續trace code,找到其定義
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18namespace boost {
namespace fibers {
namespace asio {
class yield_t {
public:
yield_t() = default;
yield_t operator[]( boost::system::error_code & ec) const {
yield_t tmp;
tmp.ec_ = & ec;
return tmp;
}
boost::system::error_code * ec_{ nullptr };
};
thread_local yield_t yield{};
}}}
- 持續trace code,找到其定義
- 好樣的,我們觸發了他的operator[],具體只是把我們server()中的ec的addr綁到位於thread_local的yield上
- 然後作為AcceptToken傳遞給async_accept(),我的老天鵝,yield_t又沒有operator(),到底是怎麼樣變出callback的?
- 讓我們繼續往更深層跳......
- 我們一路step in,會看到一個關鍵位置
1
2
3
4
5
6
7
8
9
10
11typedef typename boost::asio::async_result<
typename decay<CompletionToken>::type,
Signature>::completion_handler_type completion_handler_type;
/*一些其他的code*/
explicit async_completion(CompletionToken& token)
: completion_handler(static_cast<typename conditional<
is_same<CompletionToken, completion_handler_type>::value,
completion_handler_type&, CompletionToken&&>::type>(token)),
result(completion_handler)
{
} - 哇,酷喔,因為我們給的AcceptToken是yield_t,導致上面的completion_handler_type被推導成
async_result< boost::fibers::asio::yield_t, void(boost::system::error_code) >::completion_handler_type
- 還真的有一個partial specialization長這樣
1
2
3
4
5
6
7
8
9
10template<>
class async_result< boost::fibers::asio::yield_t, void(boost::system::error_code) > :
public boost::fibers::asio::detail::async_result_base {
public:
using return_type = void;
using completion_handler_type = fibers::asio::detail::yield_handler<void>;
explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h):
boost::fibers::asio::detail::async_result_base{ h } {}
};- 你,看到了嗎.....?那個潛伏在陰暗角落,蠢蠢欲動的completion_handler_type
- 講了那麼多,最後發現傳入yield_t之後,completion_handler最後會初始化成一個yield_handler
- 這個咚咚可就厲害了,它可是有operator()的!!
- 但不要忘記,他還有個base class,一樣也有一個operator()
- 先來看一下yield_handler的operator()
1
2
3
4
5
6
7
8void operator()(boost::system::error_code const& ec, T t) {
BOOST_ASSERT_MSG( value_,
"Must inject value ptr "
"before caling yield_handler<T>::operator()()");
* value_ = std::move( t);
yield_handler_base::operator()(ec);
} - 我們可以看到value_(就是我們的socket),會被assign t,這個t就是當async_accept()完成之後會得到的socket
- 接下來就會繼續call yield_handler_base的operator()
- 所以我們現在知道,AcceptToken給yield_t,最後async_accept()完事之後會呼叫的callback來自這裡
- 我們接著來看yield_handler_base的operator()做了啥?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31class yield_handler_base {
public:
yield_handler_base( yield_t const& y) :
ctx_{ boost::fibers::context::active() },
yt_( y ) {}
void operator()( boost::system::error_code const& ec) {
BOOST_ASSERT_MSG( ycomp_,
"Must inject yield_completion* "
"before calling yield_handler_base::operator()()");
BOOST_ASSERT_MSG( yt_.ec_,
"Must inject boost::system::error_code* "
"before calling yield_handler_base::operator()()");
yield_completion::lock_t lk{ ycomp_->mtx_ };
yield_completion::state_t state = ycomp_->state_;
ycomp_->state_ = yield_completion::complete;
* yt_.ec_ = ec;
lk.unlock();
if ( yield_completion::waiting == state) {
// wake the fiber
fibers::context::active()->schedule( ctx_);
}
}
boost::fibers::context * ctx_;
yield_t yt_;
yield_completion::ptr_t ycomp_{};
}; - 兩個重點
* yt_.ec_ = ec;
會把真正的error code寫回我們給的yield_tyield_completion::waiting == state
如果我們的server fiber在async_accept()之後沒有馬上就accept到連線(正常來說都是這樣),那麼這個state就會是waiting,我們此時就會把server fiber的context排進排程裏頭,讓他盡快被跑起來
- 跑起來之後就會把已經accept的socket交遊session fiber去執行工作
- 但還有一個問題,我們在呼叫async_accept()之後,在我們正式accept到連線之前的這段時間內,執行權在誰手上
- async_開頭的function都是non-blocking的,所以不會卡在原地
- 但看起來也不像是回到server fiber......
- 我們可以在往下看,我們的async_accept()事實上背後接著一個async_initiate(),這個東西內部有一個玄機
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19template <typename CompletionToken,
BOOST_ASIO_COMPLETION_SIGNATURE Signature,
typename Initiation, typename... Args>
inline typename enable_if<
!detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(BOOST_ASIO_MOVE_ARG(Initiation) initiation,
BOOST_ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
BOOST_ASIO_MOVE_ARG(Args)... args)
{
async_completion<CompletionToken, Signature> completion(token);
BOOST_ASIO_MOVE_CAST(Initiation)(initiation)(
BOOST_ASIO_MOVE_CAST(BOOST_ASIO_HANDLER_TYPE(CompletionToken,
Signature))(completion.completion_handler),
BOOST_ASIO_MOVE_CAST(Args)(args)...);
return completion.result.get();
} - 最後的那個completion.result.get()是關鍵,我們一個step進去看一看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15void get() {
ycomp_->wait();
if ( ec_) {
throw_exception( boost::system::system_error{ ec_ } );
}
}
/*再來直接看ycomp->wait()*/
void wait() {
lock_t lk{ mtx_ };
if ( complete != state_) {
state_ = waiting;
fibers::context::active()->suspend( lk); // !!
}
} - 不看不知道,一看不得了,上面這段code說了啥?
- 假設我們呼叫get()的時候,ycomp的state不是complete(換言之,callback還沒有被跑過),那基本上進去wait()就會觸發fibers::context::active()->suspend()
- 這個動作相當於是直接yield控制權,換言之,我們的async_accept()在呼叫後如果沒有馬上accept到連線的話,就會把控制權yield出來給其他fiber!!
- 一旦async_accept()讓出執行權後,在他完成之前,就會有其他的fiber被拉起來跑(client
or session),而當這兩個fiber各自也執行async
action的時候,就會如同上面async_accept()一樣讓出執行權
- 如此我們就能只靠一條thread達到concurrency了
- 使用我們在main裏頭準備好的acceptor
a去等待client的連線,async_accept()
延伸閱讀
結論
想不到吧,Alice居然就是Bob!