std::sync::mpsc::channelで少しハマったのでメモ
std::sync::mpsc - Rustの挙動についてちょっと知見を持ったので綴っていく。
やりたいこと
めっちゃ重たいタスクがN個あって、それぞれにスレッドを作ってmpsc::channel
経由でメインスレッドに値を返してもらう例を考える。
use std::thread; use std::sync::mpsc; let (tx, rx) = mpsc::channel(); for costly_task in costly_tasks.into_iter() { let tx = mpsc::Sender::clone(&tx); thread::spawn(move || { let val = costly_task(); tx.send(val).unwrap(); }); } for val in rx { dbg!(val); }
これだと実は下のfor文を抜けることは出来ない。実際にplaygroundでやると強制的にkillされていることが確認できる。
ではどうするのか?というと for文の前で drop(tx);
を呼んであげる必要がある。
for costly_task in costly_tasks.into_iter() { // ... } drop(tx); for val in rx { // ... }
僕みたいになぜ?となった方へこの問題をもう少し簡単にしてみる。ならなかった人はスター押してはてブ追加して頂ければ大丈夫です。
簡単にした例
use std::sync::mpsc; let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); tx1.send(1).unwrap(); for val in rx { dbg!(val); }
これも同様にfor文を抜けることが出来ない。というわけで drop(tx); drop(tx1);
を追加してみると同様に抜けることができる。
どうやらfor _ in rx { ... }
はsenderが値が送りきるのを待つ挙動をしているようだ、とのことでドキュメント 1 を確認しに行く。
Returns an iterator that will block waiting for messages, but never panic!. It will return None when the channel has hung up.
うんうん、推測は当たっていたようだ。Sender::send
は &self
なので、1回値を送っただけではdropしない。なので、rx
はtx
達が生きていることが分かっているので明示的にdrop
してあげないとforから抜けれなかったわけか。
ということは
use std::thread; use std::sync::mpsc; // このtxは使ってないけど明示的にdropさせる必要がある let (tx, rx) = mpsc::channel(); for costly_task in costly_tasks.into_iter() { // ここの tx は for が回るたびにスコープから抜ける => drop する let tx = mpsc::Sender::clone(&tx); thread::spawn(move || { let val = costly_task(); tx.send(val).unwrap(); }); } // 明示的にdropしてあげる drop(tx); // 全部のSenderがdropしているのでこのforはちゃんと抜けられる for val in rx { dbg!(val); }
ってわけですね、少しハマったのでメモです
宣伝
今回の挙動を確認したりドキュメントを読んだりするのにrust-jpのSlackで質問を投げていました。活発なコミュニティで優しい方が多いので皆さん入りましょう!!
-
本当は
into_iter
なんだけどなかったしiter
でも同じでしょう↩