.NET非同期処理(async-await)を制御する、様々な方法

async-awaitベースの非同期処理を制御する方法をまとめました。コードはわざと冗長に書いています。

概要:

  • Taskベースのワーカースレッド生成
  • 他の非同期I/Oと連携してタスクを制御する方法
  • TaskとLINQを応用して、多量の計算を安全に並列実行させる方法
  • Taskを使っていない非同期処理をTask化する方法
  • 非同期処理のキャンセルの実現方法
  • WinRT・ユニバーサルWindowsアプリケーション(UWP)での非同期処理とTaskの連携方法

読む前に補足

C#でTaskやasync-awaitを使った非同期処理の書き方を探しているのであれば、ポイントに絞って書いた、こちらの記事をお勧めします: 「できる!C#で非同期処理(Taskとasync-await)」

Taskクラスの使用例として、ワーカースレッドを起動するという例が良く挙げられます。本記事も最初にTask.Runによるワーカースレッドの例を紹介していますが、本来のTaskクラスの役割はワーカースレッドの管理ではなく、非同期処理の統一された操作にあります(非同期I/Oもワーカースレッドも同じように「非同期処理」として管理可能にする)。

また、async-awaitの使い方を調べていてこの記事にたどり着いた場合は、「基本的にTask.Runは使わない」と言う事を頭の片隅に置いておいて下さい。特殊な場合を除いて、明示的にTask.Runでワーカースレッドを操作する必要はありません。非同期メソッドが返すTaskクラスをawaitする事に集中すれば、問題なくコードを記述できる筈です。

違いが分からなくなったり混乱した場合は、この前提を思い出してみて下さい。

この記事の続きとして、非同期処理における例外を扱った記事もあります: 「.NET非同期処理(async-await)と例外の制御」


Task.Runの基礎

TaskクラスのRunメソッドを使用すると、ワーカースレッドを使用して独自の処理を非同期化(タスク化)出来ます。

public static async Task SampleTaskRun1Async()
{
	// ワーカースレッドが生成される
	Task task = Task.Run(new Action(() =>
		{
			// 任意のCPU依存性処理...
		}));

	// 処理が完了するのを非同期的に待機
	await task;

	Console.WriteLine("Done!");
}

Task.Runメソッドは、.NET4.5未満ではTask.Factory.StartNewと同じです。

Task.Runを使用した場合、戻り値としてTaskクラスのインスタンスが取得出来ます。
従来は、Threadクラスを使用してワーカースレッドを直接生成して管理していましたが、Task.Runで返却されたTaskを使用すれば、非同期処理と同じようにワーカースレッドの操作を待機する事が出来ます。

また、Threadクラスを使った場合、ワーカースレッドの待機は、Thread.Joinメソッドを使いますが、このメソッドはハードブロックしてしまいます。非同期処理の世界で行儀よく振る舞うためには、待機すべきあらゆる箇所でTaskクラスが必要となります。

# 厳密にはGetAwaiterメソッドの実装が担保しますが、細部なので省略。また、WinRTでは、IAsyncActionやIAsyncOperationでも待機出来ます(後述)

Taskを使用する優れた点は、他のタスクとの合成が簡単であることです。

public static async Task SampleTaskRun2Async(Stream dataStream, byte[] data)
{
	Task task1 = Task.Run(new Action(() =>
		{
			// 任意のCPU依存性処理...
		}));

	Task task2 = Task.Run(new Action(() =>
		{
			// 他の任意のCPU依存性処理...
		}));

	// ネイティブな非同期I/O処理
	Task task3 = dataStream.WriteAsync(data, 0, data.Length);

	// 全ての処理が完了するのを非同期的に待機
	Task combinedTask = Task.WhenAll(task1, task2, task3);
	await combinedTask;

	Console.WriteLine("All done!");
}

Task.WhenAllメソッドは、指定されたタスク群が全て完了するのを待機する、新しいTaskを返します。
上記のように、CPU依存性の(つまり、ワーカースレッドで実行する)非同期処理と、ネイティブなI/O操作の非同期処理を、全く同じように扱う事が出来ます。

Task.Runを使うと簡単にワーカースレッドを制御出来るので、従来のThreadクラスは、

  • スレッド名を割り当てる必要がある
  • COMアパートメントを設定する必要がある
  • スレッドローカルストレージに残骸が残るような状況を完全に破棄したい

と言うような場合にだけ使用すれば良いでしょう。


Task.Runの戻り値

Task.Runは戻り値を返すことが出来ます。

public static async Task SampleTaskRun3Async(int count)
{
	// Func<T>デリゲートを使用することで、処理の戻り値を返す事が出来る
	Task<double> task = Task.Run(new Func<double>(() =>
		{
			// 任意のCPU依存性処理...
			var r = new Random();
			return Enumerable.Range(0, count).Select(index => r.Next()).Average();
		}));

	// 処理が完了するのを非同期的に待機
	double result = await task;

	Console.WriteLine("Done: {0}", result);
}

Task.Run引数のデリゲートが戻り値を返す「Func<T>」である場合、返されるTaskも「Task<T>」となり、awaitすると戻り値が返されます。


Task.WhenAllで、結果の集約

戻り値を返すタスクと、Task.WhenAllとLINQを使って応用すると、大量のデータを効率よくワーカースレッドに流し込んで並列実行させ、かつ全て完了するのを待機するという、今までなら出血しそうなコードが、非常に簡単に安全に記述出来ます。

public static async Task SampleTaskRun4Async(int count)
{
	// 計算を非同期に実行するタスクを、count個列挙する
	// 1, 1+2, 1+2+3, 1+2+3+4, ...
	IEnumerable<Task<long>> tasks =
		Enumerable.Range(1, count).
		Select(index => Task.Run(() => Enumerable.Range(1, index).Sum(v => (long)v)));

	// 全ての処理が完了するのを非同期的に待機
	long[] results = await Task.WhenAll(tasks);

	Console.WriteLine("Done: [{0}]", string.Join(",", results));
}

慣れていないと分かり難いかも知れません。この処理は、以下のように動作します。

taskrun2この方法は、タスク(中身はワーカースレッド)を指定された個数分起動し、全てが完了するのを待ちます。直感的には、大量のワーカースレッドが生成され、コンテキストスイッチングで飽和してまともに動作しないように見えますが、実際はそうなりません。Task.Runは、スレッドプールからワーカースレッドを取得しますが、スレッドプールの総スレッド数は、効率よく実行できる程度に調整されています(図の例では、最大で4個のスレッドが次々と計算を処理します)。

taskrun11このスクリーンショットは、Process Explorerでスレッドに割り当てられているCPUサイクルを見たものです。このテストを実施したマシンは、4コア2論理スレッドなので、システム上は8スレッド使えます。実際にほぼ8スレッドだけがアクティブに動作し、大量のワーカースレッドで飽和する事が無い事が分かります。

PLINQ(並列LINQ)と比べると、タスクを集約可能な式として実装する必要があるため、PLINQのように自然に拡張する事は出来ませんが、パフォーマンスの予測がしやすい事が利点です。


Taskの存在しない世界に、Taskを導入する

このように、処理に紐づいたTaskがあれば、非常に応用範囲が広くなります。しかし、そもそも処理の完了をTaskで担保しない場合はどうでしょうか。例えば、.NET CLRのイベントは一種のコールバックなので、対応するTaskが存在しません。

より具体的な例で考えます。WPFのボタンは、クリックされるとClickイベントが発火します。普通はこれをフックして、ハンドラとなるラムダブロックやメソッドで処理を実装します。この時、擬似的なTaskで発火状態を置き換える事が出来れば、様々な応用が可能になります。

処理の完了を疑似的なTaskで表現するのに、「TaskCompletionSource<T>」クラスが使えます。

// (コードビハインドで書いていますが、推奨はしません。また、エラーチェックを省略しています)
private async void OnInitialized(object sender, EventArgs e)
{
	// 文字列を返すことが出来るTaskCompletionSourceを準備
	var tcs = new TaskCompletionSource<string>();

	// ボタンがクリックされたら、テキストボックスの文字列をTaskCompletionSourceを介して間接的に返却する
	button.Click += (s, e) => tcs.SetResult(textBox.Text);

	// 非同期で待機する
	string inputText = await tcs.Task;

	// 結果をテキストブロックに表示
	textBlock.Text = inputText;
}

SetResultメソッドを呼び出すと、待機しているタスクが戻り値を伴って継続します。

上記の例は、ボタンのハンドラから直接テキストブロックに表示すれば良いので、何の為に複雑にするのかわからないかも知れません。イベントをタスク化する利点は、先ほど示したように合成が簡単だからです。

// (コードビハインドで書いていますが、推奨はしません。また、エラーチェックを省略しています)
protected override async void OnInitialized(EventArgs e)
{
	// ボタン1がクリックされたら、テキストボックス1の文字列をTaskCompletionSourceを介して間接的に返却する
	var tcs1 = new TaskCompletionSource<string>();
	button1.Click += (s, e) => tcs1.SetResult(textBox1.Text);

	// ボタン2がクリックされたら、テキストボックス2の文字列をTaskCompletionSourceを介して間接的に返却する
	var tcs2 = new TaskCompletionSource<string>();
	button2.Click += (s, e) => tcs2.SetResult(textBox2.Text);

	// 両方のボタンがクリックされるまで待機
	string[] inputTexts = await Task.WhenAll(tcs1.Task, tcs.Task2);

	// 結果をテキストブロックに表示
	textBlock.Text = string.Join(",", inputTexts);
}

上記の例では冗長に書きましたが、例えば動的に任意の個数で生成したテキストボックスやボタンの列に対して簡単に拡張・記述量の大幅削減が出来そうです。

// (コードビハインドで書いていますが、推奨はしません。また、エラーチェックを省略しています)
protected override async void OnInitialized(EventArgs e)
{
	// itemCount_個だけUI部品を生成し、関連付けられたTask群を返す
	IEnumerable<Task<string>> tasks =
		Enumerable.Range(0, itemCount_).
		Select(index =>
		{
			var textBox = new TextBox();
			var button = new Button();
			stackPanel.Children.Add(textBox);
			stackPanel.Children.Add(button);
			
			var tcs = new TaskCompletionSource<string>();
			button.Click += (s, e) => tcs.SetResult(textBox.Text);

			return tcs.Task;
		});

	// 全てのボタンがクリックされるまで待機
	string[] inputTexts = await Task.WhenAll(tasks);

	// 結果をテキストブロックに表示
	textBlock.Text = string.Join(",", inputTexts);
}

もう一つ例を示します。ワーカースレッドにThreadクラスを使わなければならない状況でも、TaskCompletionSource<T>を使ってタスク化する事が出来ます。

// COMアパートメントを指定して、独立したワーカースレッドを実行する
public static Task<TResult> RunEx<TResult>(
	Func<TResult> action,
	ApartmentState state = ApartmentState.STA)
{
	// スレッド終了を通知するTaskCompletionSource
	var tcs = new TaskCompletionSource<TResult>();

	// ワーカースレッドを生成する
	var thread = new Thread(new ThreadStart(() =>
		{
			try
			{
				// 処理本体を実行して、戻り値を得る
				var result = action();

				// Taskに終了を通知
				tcs.SetResult(result);
			}
			catch (Exception ex)
			{
				// 例外をTaskに通知
				tcs.SetException(ex);
			}
		}));
	thread.SetApartmentState(state);
	thread.Start();

	// Task<TResult>を返す
	return tcs.Task;
}

このRunEx<T>は、Task.Run<T>のように使え、かつCOMのアパートメントを指定可能にする例です。

TaskCompletionSource<T>は、例外を通知する事も出来ます。SetExceptionメソッドを呼び出すと、非同期待機しているタスクで指定された例外がスローされます。

なお、何故かTaskCompletionSource<T>のジェネリック引数を指定しないバージョン(つまり、非同期待機後に戻り値を受け取らないバージョン)は存在しません。但し、Task<T>は、Taskクラスを継承しているので、適当な引数型を与えて、完了もSetResultに適当な値を渡して代用する事が出来ます。


非同期処理のキャンセル

非同期処理をキャンセルするためのインフラも標準で用意されています。

// 指定されたデータをストリームに指定回数出力する
public static async Task SampleTaskRun5Async(
	Stream stream,
	byte[] data,
	int count,
	CancellationToken token)
{
	for (var index = 0; index < count; index++)
	{
		// キャンセル要求があれば、例外をスローして中断する
		token.ThrowIfCancellationRequested();

		// 非同期I/O処理も、中断を可能にする
		await stream.WriteAsync(data, 0, data.Length, token);
	}
}

CancellationToken構造体は、関連する非同期処理全体にわたってキャンセル要求を通知するための構造体です。キャンセルが発生したかどうかを保持する、一種のフラグのようなものです。呼び出し側が何らかの事情でキャンセル要求を行うと、このトークン経由で通知されるので、非同期処理を中断させる事が出来ます。

ThrowIfCancellationRequestedメソッドは、キャンセルが要求されているかどうかを検出し、キャンセルしていればその場でOperationCanceledExceptionをスローします。上の例ではトークンをWriteAsyncにも渡しているので、あまり大きな意味はありません(キャンセルされればWriteAsync内で例外がスローされる)。出来るだけ早くキャンセル要求に反応したい場合に、ThrowIfCancellationRequestedを使う事が出来ます。

ところで、CancellationTokenはどこから来るのでしょうか? CalcellationToken自体は、キャンセル要求を通知するための抽象的な構造体に過ぎません。自分で呼び出し側のコードを書いている場合は、CancellationTokenSourceクラスを使う事で、制御可能なトークンを用意する事が出来ます。

// (コードビハインドで書いていますが、推奨はしません。また、エラーチェックを省略しています)
protected override async void OnInitialized(EventArgs e)
{
	// CancellationTokenSourceクラスを準備(これでキャンセルを通知出来る)
	var cts = new CancellationTokenSource();

	// ボタンがクリックされたら、キャンセルを通知する
	button.Click += (s, e) => cts.Cancel();

	try
	{
		// 時間のかかる非同期処理にキャンセルトークンを渡す
		Task task = SampleTaskRun5Async(stream_, data_, 100000, cts.Token);

		// 非同期で待機する
		await task;

		// 結果をテキストブロックに表示
		textBlock.Text = "Output completed!";
	}
	catch (OperationCancelledException)
	{
		// 結果をテキストブロックに表示
		textBlock.Text = "Output canceled...";
	}
}

仮に、キャンセルさせたい場合の実行コンテキストが存在しない場合は、Registerメソッドを使って、コールバック化する事が出来ます。以下の例は、手元に実行コンテキストがある(awaitで待機している)ため、あまり適切な例ではありませんが、例えばネイティブAPIで処理開始後、処理がスレッドから完全に切り離されるような状況下で使用する事が出来ます。

// 指定されたデータをストリームに指定回数出力する
public static async Task SampleTaskRun6Async(
	Stream stream,
	byte[] data,
	int count,
	CancellationToken token)
{
	// キャンセル要求に対して、コールバックで反応する
	// (下記の実処理とは無関係に、キャンセル時にコールバックが発生する)
	var index = 0;
	token.Register(() =>
		// (実際にはここでキャンセル処理を行う)
		Console.WriteLine("Operation canceled: Written count={0}", index));

	// 実処理
	for (; index < count; index++)
	{
		token.ThrowIfCancellationRequested();
		await stream.WriteAsync(data, 0, data.Length, token);
	}
}

この他にも、WaitHandleプロパティから、待機可能なWin32カーネルオブジェクトが取得出来ます。Win32 APIと連携して同時にキャンセルを待機させたい場合などに使う事が出来ます。


WinRT・ユニバーサルWindowsアプリ(UWP)での非同期処理

WinRTの世界では、WinRT APIの呼び出しに対する非同期処理が「IAsyncAction」「IAsyncOperation」インターフェイスで表現されています。これらは「IAsyncInfo」インターフェイスを継承していて、Taskクラスによく似ていますが、追加の情報(進行状況など)を通知する能力を持たせることが出来るようになっています。

IAsyncActionとIAsyncOperationは、非同期処理の結果として戻り値を返すかどうかが異なります。Taskクラスの非ジェネリックバージョンとジェネリックバージョンに対応します。そして、これらのインスタンスは「await」で待機可能です。

.NET Task WinRT IAsyncInfo
戻り値無し Task IAsyncAction
戻り値あり Task<TResult> IAsyncOperation<TResult>

そのようなわけで、大体Taskクラスと同じように扱えますが、Task.WhenAllなどを使ったタスクの合成は出来ません。Task.WhenAllの引数は、Taskクラスしか受け付けないからです。そこで、IAsyncActionやIAsyncOperationをTaskクラスに変換する、ヘルパーメソッドが用意されています。

// WinRT非同期処理群を待機して、結果をテキストブロックに反映する
public static async Task SampleTaskRun7Async(IEnumerable<IAsyncOperation<string>> asyncOperations)
{
	// IAsyncOperation<string>をTask<string>に変換
	// (WindowsRuntimeSystemExtensions.AsTask拡張メソッド)
	IEnumerable<Task<string>> tasks =
		asyncOperations.Select(asyncOperation => asyncOperation.AsTask());

	// すべてが完了するのを待機出来るタスクに変換
	Task<string[]> combinedTask = Task.WhenAll(tasks);

	// 完了を待つ
	string[] results = await combinedTask;

	// 結果を反映
	textBlock.Text = string.Join(",", results);
}

AsTask拡張メソッドを使用すると、Taskに変換できます。一旦Taskに変換できれば、これまでの例と同じように応用が可能です。また、TaskをIAsyncActionやIAsyncOperationに逆変換する拡張メソッド(AsAsyncActionAsAsyncOperation)も用意されています。

WinRTの世界では、CancellationTokenに相当する、キャンセル管理用のクラスやインターフェイスがありません。その代わり、IAsyncInfo.Cancelメソッドがあり、直接キャンセルを要求する事が出来ます。TaskとCancellationTokenの世界をシームレスに結合する場合は、AsTaskのオーバーロードにCancellationTokenを渡すことによって、Cancelメソッドの操作も自動的に行わせる事が出来ます。

投稿者:

kekyo

A strawberry red slime mold. Likes metaprogramming. MA. Bicycle rider. http://amzn.to/1SeuUwD