列挙子を非同期で実行して、可能なら結果をキューに蓄積したい場合がある。列挙子の要素生成速度が十分に早ければ、並列実行出来ることになる。
並列実行コレクションに、丁度この目的に使えるBlockingCollectionクラスがある。
public static class LinqExtensions
{
public static IEnumerable<T> Buffering<T>(this IEnumerable<T> enumerable, int queueCount = 10)
{
var queue = new BlockingCollection<T>(queueCount);
Task.Factory.StartNew(() =>
{
try
{
foreach (var value in enumerable)
{
queue.Add(value);
}
}
finally
{
queue.CompleteAdding();
}
});
return queue.GetConsumingEnumerable();
}
}
使うときは、非同期化したい列挙子の直後に指定するだけだ。
var r = new Random();
foreach (var value in
Enumerable.Range(0, 1000000).
Select(index => r.Next()).
Buffering(1000))
{
Console.WriteLine(value);
}
これで、乱数の生成は最大1000個まで非同期で実行されてバッファリングされる。コンシューマー側(foreach)の処理が遅く、乱数の生成が早ければ、効率よく動作する。