LINQは本当に強力だ (8) 何が並列化されるのか

並列化が簡単に出来る事がLINQのアドバンテージの一つなのだが、AsParallelを付ける際に、クエリのどこがどのように並列化されるのかが見えているだろうか?
AsParallelを付ければ並列化出来るというのは簡単すぎるため、意味のない適用や実害が生じる可能性もある。たとえば、以下のコードを考えてみる。

// 重複しない乱数列を生成する
var rand = new Random();
var captured = new ConcurrentDictionary<int, int>();
var results =
    from index in
        Enumerable.Range(0, 100000)
    let value = rand.Next(200000)
    where captured.TryAdd(value, index) // 辞書に値の追加を試み、成功すればtrue
    select value;

少し脱線するが、ConcurrentDictionaryはDictionaryのスレッドセーフバージョンだ。しかも、ロック競合が起きにくく、かつアトミック操作が可能なメソッドが追加されており、上記のようにLINQでも使いやすい(DictionaryクラスはTryGetValueがoutパラメータを使うので、LINQでは使いにくかった)。

さて、このコードは小さいので並列化にはあまり向いていないが、それは横に置いておき、並列化したいのはどこなのかを考える。
やろうとしていることは、10万回分の乱数を取得し、その中から重複する値を避けた乱数列の取得だ。もちろん、Distinctを使えばいいのだが、それも横に置いておく。
生成した乱数(value)を辞書に追加し、成功すれば(つまり辞書にその値が無く、追加された場合)に、その値がselectで射影される。この、乱数の生成と辞書への追加の試行部分を並列化出来れば、大半の処理が並列化出来たことになる。
クエリ構文で書いていることもあり、AsParallelを挿入すべき場所は一か所しかないが、取りあえず挿入する。

var rand = new Random();
var captured = new ConcurrentDictionary<int, int>();
var results =
    from index in
        Enumerable.Range(0, 100000).
        AsParallel()    // 並列化
    let value = rand.Next(200000)
    where captured.TryAdd(value, index)
    select value;

LINQ(とPLINQ)を書き始めて間もないと、このAsParallelによって何が並列化されるのか、誤解する事が多いようだ。上記の例では、10万までの数値を生成する「Range」が並列化され、それ以降の乱数生成や判定が並列化されることも期待しつつも、本当に並列化されるのか自信が持てないらしい。
(いや、自分も最初はそうだったから、そういうものだと思う :-)

最初に書いた通り、良くも悪くもAsParallelを付けるだけでよいというのが、この誤解の主原因なのだろう。
まず、クエリ構文をメソッド構文にする。

var rand = new Random();
var captured = new ConcurrentDictionary<int, int>();
var results =
    Enumerable.Range(0, 100000).
    AsParallel().
    Select(delegate(index)
        {
            return new
            {
                value = rand.Next(200000),
                index = index
            };
        }).
    Where(delegate(entry)
        {
            return captured.TryAdd(entry.value, entry.index)
        }).
    Select(delegate(entry)
        {
            return entry.value;
        });

意味もなくletを使ってしまったので面倒な事になっているが、前回見せたとおりindexとvalueを射影しているだけだ。肝心のAsParallelは、letを射影するSelectの手前に入っている。Range→AsParallel→Select→Where→Selectと、パイプライン結合されているのが分かる。

そう、パイプライン結合されているのだ、AsParallelも。

パイプライン結合を実現しているのは、IEnumerable<T>インターフェイスだ。クエリの末端(最後のSelect)が列挙されるとき、GetEnumeratorが呼び出される連鎖が起きることを述べた。AsParallelもまた、同じようにGetEnumeratorの呼び出しと、列挙の連鎖が発生する。ということは、AsParallelが動作に影響を与えられるのは、自分よりも上位のRangeだけ、と言う事になる。

え?そうなの?
いやいや、実はこれが判りにくい原因ではないかと思う。AsParallelよりも下位のパイプライン結合が並列化されるのだ。つまり、乱数の生成と辞書への追加が、期待通り並列化される。Rangeによるindexの生成は並列化「されない」。
どうしてこのようになるのだろうか?ここにクエリ構文を使いすぎたり、varを使いすぎたりする弊害がある。上記のコードをばらばらにし、.NET2.0的にしてみる。

IEnumerable<int> results0 =
    Enumerable.Range(0, 100000);

ParallelQuery<int> results1 =
    results0.AsParallel();

ParallelQuery<Tuple<int, int>> results2 =
    results1.Select(delegate(index)
        {
            return Tuple.Create(rand.Next(200000), index);
        });

ParallelQuery<Tuple<int, int>> results3 =
    results2.Where(delegate(entry)
        {
            return captured.TryAdd(entry.Item1, entry.Item2);
        });

ParallelQuery results4 =
    results3.Select(delegate(entry)
        {
            return entry.Item1;
        });

匿名クラスは表現できないので、Tupleに置き換えてある。AsParallelの戻り値の型は、実はIEnumerable<T>ではない。ParallelQuery<T>型なのだ。但し、ParallelQuery<T>はIEnumerable<T>を実装しているので、これを直接foreachなどで列挙することは可能だ。つまり、今まで通りIEnumerable<T>が返されると思っていても、表面上の違いは無いということだ。

しかし、このコードを見れば、なんとなく並列化される範囲が見えてくると思う。ParallelQueryによって管理されているクエリが並列化される。少し不思議なのは、IEnumerable<T>に対してSelectやWhereを呼び出した場合はIEnumerable<T>が返されるのに、ParallelQuery<T>に対してSelectやWhereを呼び出した場合は、ParallelQuery<T>が返されることだ。

これもそれほど大げさな仕掛けではない。IEnumerable<T>の場合は、SelectやWhereといった拡張メソッドは「Enumerable」クラスに定義されている。ParallelQuery<T>に対してSelect・Whereした場合は、「ParallelEnumerable」クラスの拡張メソッドが使用されるのだ。C#のコンパイラが、SelectやWhereメソッドの第一引数の型にもっとも一致する拡張メソッドを自動的に選択するため、この仕掛けが機能する。まるで、メソッドのオーバーライドを実装しているかのようだ。

では、いったいどこで並列実行のからくりが実現されるのだろうか? ParallelQuery(ParallelEnumerable)の実装は複雑だが、基本的な考え方は単純だ。ParallelQuery<T>の最下位でGetEnumeratorが呼び出されたときに、並列化が行われる。

IEnumerable<int> results0 =
    Enumerable.Range(0, 100000);

ParallelQuery<int> results1 =
    results0.AsParallel();

ParallelQuery<Tuple<int, int>> results2 =
    results1.Select(delegate(index)
        {
            return Tuple.Create(rand.Next(200000), index);
        });

ParallelQuery<Tuple<int, int>> results3 =
    results2.Where(delegate(entry)
        {
            return captured.TryAdd(entry.Item1, entry.Item2);
        });

ParallelQuery<int> results4 =
    results3.Select(delegate(entry)
        {
            return entry.Item1;
        });

// ParallelQuery.GetEnumerator()が呼び出される
foreach (var value in results4)
{
    Console.WriteLine(value);
}

最下位でGetEnumeratorが呼び出されると、スレッドプールからいくらかのスレッドを割り当て、各スレッドがParallelEnumerableの拡張メソッドで定義された演算を実行する。この部分は、従来のGetEnumeratorによる結合では実行できない。何故なら、IEnumeratorインターフェイスはマルチスレッドに対応していないからだ。必然的に、パイプラインで並列化演算が可能なのは、それぞれの演算が専用に設計された、ParallelEnumerableに定義された拡張メソッド群だけ、ということになる。

(もちろん、それらの拡張メソッドから呼び出されるデリゲートの実装は、いくらでも拡張可能だ。SelectやWhereのデリゲートの実装は、使う側が記述するのだから。)


AsParallelの実装とか、ParallelQueryEnumeratorの実装に興味がわくかもしれない。この部分はスレッドの割り当てやデータの分散と集約など、実際にはかなり複雑になっていると思われる。

しかし、注目する点はその部分ではなく :-) 、各ワーカースレッドが並列実行している演算の部分だ。Select→Where→Selectの部分が、スレッド毎に分散されている。パイプラインでAsParallelを適用してから、foreachで列挙されるまでに結合された演算子が並列実行される事が分かる。
そして、(当たり前ではあるが)foreachによるGetEnumeratorの呼び出しの後(foreachループ)は、並列化されていない。Console.WriteLineが並列実行されるわけではない事も分かる。ここが並列実行されたとすると、記述したとおりに実行されないわけだから、C#の構文的にも変だ。
また、AsParallelの上位も並列化されない。こちらも、IEnumeratorインターフェイスの構造に阻まれて、並列化させることはできない。

結局のところ、IEnumerable.GetEnumeratorの呼び出しによって、並列化の「壁」が出来上がるわけだ。私はこれを「ゲート」と勝手に呼んでいる。PLINQクエリが入り混じる場合、このゲートを意図的に作ってやることで、並列化されるクエリの範囲を自由にコントロールできる。

IEnumerable.GetEnumeratorを明示的に呼び出させる事が出来れば、このゲートを作ったことになる。つまり:

IEnumerable<int> results0 =
    Enumerable.Range(0, 100000);

ParallelQuery<int> results1 =
    results0.AsParallel();

ParallelQuery<Tuple<int, int>> results2 =
    results1.Select(delegate(index)
        {
            return Tuple.Create(rand.Next(200000), index);
        });

// results2の直後にゲート生成
IEnumerable<Tuple<int, int>> results3 =
    ((IEnumerable<Tuple<int, int>>)results2).Where(delegate(entry)
        {
            return captured.TryAdd(entry.Item1, entry.Item2);
        });

IEnumerable<int> results4 =
    results3.Select(delegate(entry)
        {
            return entry.Item1;
        });

ということだ。こうすれば、results2だけが並列化の対象となる。

で、キャストは面倒であるため、この目的のための「AsEnumerable」という拡張メソッドがある。内部の実装はキャストしているだけだ。PLINQ向けには「AsSequential」もあるが、これはAsEnumerableと全く変わらない。AsParallelの逆演算子のイメージで導入されたのだと思う。

投稿者:

kekyo

A strawberry red slime mold. Likes metaprogramming. MA. Bicycle rider. http://amzn.to/1SeuUwD