Advent LINQ (4): Buffering

列挙子を非同期で実行して、可能なら結果をキューに蓄積したい場合がある。列挙子の要素生成速度が十分に早ければ、並列実行出来ることになる。
並列実行コレクションに、丁度この目的に使えるBlockingCollectionクラスがある。

public static class LinqExtensions
{
    public static IEnumerable<T> Buffering<T>(this IEnumerable<T> enumerable, int queueCount = 10)
    {
        var queue = new BlockingCollection<T>(queueCount);
        Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (var value in enumerable)
                    {
                        queue.Add(value);
                    }
                }
                finally
                {
                    queue.CompleteAdding();
                }
            });

        return queue.GetConsumingEnumerable();
    }
}

使うときは、非同期化したい列挙子の直後に指定するだけだ。

var r = new Random();
foreach (var value in
    Enumerable.Range(0, 1000000).
    Select(index => r.Next()).
    Buffering(1000))
{
    Console.WriteLine(value);
}

これで、乱数の生成は最大1000個まで非同期で実行されてバッファリングされる。コンシューマー側(foreach)の処理が遅く、乱数の生成が早ければ、効率よく動作する。

投稿者:

kekyo

A strawberry red slime mold. Likes metaprogramming. MA. Bicycle rider. http://amzn.to/1SeuUwD