スレッド数に制限の設けられたスレッドプールを使用することで、スレッドプール内で並行実行可能なスレッド数の上限を指定することができる。ただし、プール内の他のタスクの完了に依存するタスクの実行に、このようなスレッドプール内のスレッドを用いてはならない。
「スレッド飢餓状態のデッドロック」(thread starvation deadlock)は、プール内で実行中のスレッドすべてが、内部キュー内で開始されないまま待機しているタスクの完了を待ち続ける場合に発生する。すなわち、現在実行中のタスクが、スレッドプールに他のタスクの実行を依頼してタスクの完了を待つが、それらのタスクを格納する余裕がスレッドプールにない場合に、スレッド飢餓状態のデッドロックが発生する。
プログラムの実行に必要なスレッドの数が少なくてすむ場合には問題なく動作するため、プログラムが正しく動作しなくなったときに、原因を見極めることが難しくなるかもしれない。この問題は、プールサイズを拡張することで解決できる場合もあるが、適切なプールサイズを決定するのは容易ではなく、場合によっては不可能であろう。
同様に、2つの実行中のタスクが、それぞれ他方のタスクの完了を待たなければならない場合、どちらのタスクも処理を完了できず、タスクを実行しているスレッドは、結果的に再利用できないかもしれない。また、サブタスク内で待ち状態となる操作により、タスクのキューが際限なく伸びてしまうこともありうる [Goetz 2006]。
違反コード (サブタスク間に相互依存関係が存在)
以下の違反コード例は、スレッド飢餓状態のデッドロックを引き起こす可能性がある。ValidationServiceクラスは、ユーザが指定したフィールドがバックエンドのデータベースに存在するかどうかチェックする、などの入力値検査を実行する。
fieldAggregator()メソッドは、可変個のString引数を取り、各引数に対応するタスクを生成して並列処理する。それぞれのタスクは、ValidateInputクラスを使用して入力値検査を行う。
次にValidateInputクラスは、SanitizeInputクラスを使用して各リクエスト用のサブタスクを作成し、入力値を無害化する。これらのタスクは、すべて同じスレッドプール内で実行される。fieldAggregator()メソッドは、すべてのタスクの実行が完了するまで待ち状態となり、すべてのタスクの処理結果が揃った段階で、これらの結果を含むStringBuilderオブジェクトを呼出し元に返す。
public final class ValidationService { private final ExecutorService pool; public ValidationService(int poolSize) { pool = Executors.newFixedThreadPool(poolSize); } public void shutdown() { pool.shutdown(); } public StringBuilder fieldAggregator(String... inputs) throws InterruptedException, ExecutionException { StringBuilder sb = new StringBuilder(); // 結果を格納する Future<String>[] results = new Future[inputs.length]; // タスクをスレッドプールに依頼する for (int i = 0; i < inputs.length; i++) { results[i] = pool.submit( new ValidateInput<String>(inputs[i], pool)); } for (int i = 0; i < inputs.length; i++) { // 結果を集める sb.append(results[i].get()); } return sb; } } public final class ValidateInput<V> implements Callable<V> { private final V input; private final ExecutorService pool; ValidateInput(V input, ExecutorService pool) { this.input = input; this.pool = pool; } @Override public V call() throws Exception { // 検証に失敗した場合、ここで例外をスローする Future<V> future = pool.submit(new SanitizeInput<V>(input)); // サブタスクの生成 return (V) future.get(); } } public final class SanitizeInput<V> implements Callable<V> { private final V input; SanitizeInput(V input) { this.input = input; } @Override public V call() throws Exception { // 入力情報を無害化して返している return (V) input; } }
スレッドプールサイズが6である場合を考えてみよう。ValidationService.fieldAggregator()メソッドが呼び出されて6つの引数が検証され、6つのタスクの処理がスレッドプールへと依頼される。さらに各タスクは、入力値を無害化するよう、それぞれ対応するサブタスクに依頼する。サブタスクであるSanitizeInput()が実行されて初めて、このサブタスクを呼び出した元の6つのタスクは結果を返すことができる。しかし、スレッドプールの6つのスレッドはすべて待ち状態になっており、サブタスクであるSanitizeInputを開始することができない。また、スレッドプールがアクティブなタスクを含んでいる場合、shutdown()メソッドではスレッドプールを終了できない。
シングルスレッド構成のExecutorにおいても、呼出し元がいくつかのサブタスクを生成して結果を待つ場合、スレッド飢餓状態のデッドロックが発生しうる。
適合コード (タスク間に相互依存関係が存在しない)
以下の適合コードでは、SanitizeInputタスクがValidateInputタスクと同じスレッドで実行されるように、ValidateInput<V>クラスを修正している。結果的に、ValidateInputタスクとSanitizeInputタスクはそれぞれ独立しており、互いに処理の完了を待つ必要はない。また、SanitizeInputクラスは、Callableインタフェースを実装しないように修正されている。
public final class ValidationService { // ... public StringBuilder fieldAggregator(String... inputs) throws InterruptedException, ExecutionException { // ... for (int i = 0; i < inputs.length; i++) { // スレッドプールにはそのまま渡さない。 results[i] = pool.submit(new ValidateInput<String>(inputs[i])); } // ... } } // 同じスレッドプールは使用しない public final class ValidateInput<V> implements Callable<V> { private final V input; ValidateInput(V input) { this.input = input; } @Override public V call() throws Exception { // 検証に失敗した場合、ここで例外をスローする return (V) new SanitizeInput().sanitize(input); } } public final class SanitizeInput<V> { // もはやCallableタスクではない public SanitizeInput() {} public V sanitize(V input) { // 入力情報を無害化して返している return input; } }
スレッド飢餓状態は、スレッドプールのサイズを大きくすることで軽減することができる。しかし、信頼できない呼出し元はそれを上回る入力を行うことで、システムに過大な負荷を与えることができてしまう(「TPS00-J. スレッドプールを使用しトラフィックの大量発生による急激なサービス低下を防ぐ」を参照)。
なお、各スレッドは必要なリソースが利用可能になるまで待ち状態になるため、同時に利用できるデータベース接続数や一度にオープンできるResultSetオブジェクト数などに制約がある場合、タスクの処理を行うことができるスレッドの数にも制約を受けることに注意する必要がある。
各スレッドの内部状態を保持するために、private static宣言したThreadLocal変数を使う場合があるだろう。スレッドプールを使用する場合、ThreadLocal変数の生存期間は、対応するタスクの実行中のみに制限すべきである[Goetz 2006]。また、これらのThreadLocal変数をタスク間通信に使用してはならない。スレッドプール内でThreadLocal変数を使用する場合に気を付けるべき制約については、「TPS04-J. スレッドプールの使用時にはThreadLocal変数の再初期化を確実に行う」を参照。
違反コード (サブタスク)
以下の違反コードは、共有スレッドプールで実行される一連のサブタスクを含んでいる [Gafter 2006]。BrowserManagerクラスはperUser()メソッドを呼び出し、perUser()メソッドはperProfile()メソッドを呼び出すタスクを開始する。perProfile()メソッドは、perTab()メソッドを呼び出すタスクを開始し、perTab()メソッドはdoSomething()メソッドを呼び出すタスクを開始する。BrowserManagerクラスは、これら一連のタスクの終了を待つ。countが実行されたメソッド数を正しく記録する限りにおいて、メソッドは任意の順序でdoSomething()を呼び出すことができる。
public final class BrowserManager { private final ExecutorService pool = Executors.newFixedThreadPool(10); private final int numberOfTimes; private static AtomicInteger count = new AtomicInteger(); // count = 0 public BrowserManager(int n) { numberOfTimes = n; } public void perUser() { methodInvoker(numberOfTimes, "perProfile"); pool.shutdown(); } public void perProfile() { methodInvoker(numberOfTimes, "perTab"); } public void perTab() { methodInvoker(numberOfTimes, "doSomething"); } public void doSomething() { System.out.println(count.getAndIncrement()); } public void methodInvoker(int n, final String method) { final BrowserManager manager = this; Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws Exception { Method meth = manager.getClass().getMethod(method); return meth.invoke(manager); } }; Collection<Callable<Object>> collection = Collections.nCopies(n, callable); try { Collection<Future<Object>> futures = pool.invokeAll(collection); } catch (InterruptedException e) { // ハンドラへの転送 Thread.currentThread().interrupt(); // 割込みステータスのリセットを行う } // ... } public static void main(String[] args) { BrowserManager manager = new BrowserManager(5); manager.perUser(); } }
残念ながら、このプログラムはスレッド飢餓状態のデッドロックを引き起こす可能性がある。たとえば、5つのperUserタスクの各々が、5つのperProfileタスクを生成し、各perProfileタスクがperTabタスクを生成する場合、スレッドプールが枯渇し、perTab()メソッドはdoSomething()メソッドを呼び出すために新たなスレッドを割当てできなくなる。
適合コード (CallerRunsPolicy クラス)
以下の適合コードでは、実行するタスクを選択しスケジューリングすることで、スレッド飢餓状態のデッドロックを回避している。具体的には、ThreadPoolExecutorにCallerRunsPolicyをセットし、SynchronousQueueを使用している [Gafter 2006]。このポリシーでは、スレッドプールで利用できるスレッドが枯渇した場合、後続するタスクは、そのタスクを依頼したスレッド内で実行されるようになる。
public final class BrowserManager { private final static ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); private final int numberOfTimes; private static AtomicInteger count = new AtomicInteger(); // count = 0 static { pool.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); } // ... }
Goetzは以下のように述べている。[Goetz 2006]
SynchronousQueueは本当はキューではなくて、スレッド間でタスクを手渡しするための仕組みである。SynchronousQueueにタスクをputするためには、他のスレッドがすでに手渡しを待っている(takeでブロックしている)必要がある。待っているスレッドがなくて現在のプールサイズが最大サイズ未満なら、ThreadPoolExecutorは新しいスレッドを作る。スレッドを作れなければタスクは飽和ポリシーにしたがって拒絶される。
Java API 仕様 の CallerRunsPolicy クラスには以下のように記述されている[API 2006]。
execute メソッドで拒否されたタスクを呼出し元スレッドで直接実行させるハンドラである。executor がシャットダウンされている場合、タスクは破棄される。
上記の適合コードでは、スレッドプールが満杯の場合、手渡しを待っている他のタスクを持つタスクは、SynchronousQueue に追加される。たとえば、perProfile()に対応するタスクは手渡しの受け取りを待っているため、perTab()に対応するタスクはSynchronousQueueに追加される。スレッドプールが満杯になると、飽和ポリシー(saturation policy)に従い、新たなタスクは拒絶される。CallerRunsPolicy が設定されているため、拒絶されたタスクはすべて最初のタスクを開始したメインスレッドで実行される。perTab()に対応するスレッドすべての実行が完了すると、perUser()のタスクが手渡しを待ち受けるため、perProfile()に対応する次のタスク群がSynchronousQueueに追加される。
大量のリクエストを処理する場合、CallerRunsPolicyによって作業がスレッドプールから待ち行列に分散されるため、急激なサービス低下を防ぐことができる。依頼されたタスクが他のタスクの完了待ち以外の理由で待ち状態にならなければ、飽和ポリシーは実行中のスレッドが複数のタスクを順番に扱うことを保証する。ただし、もしタスクがネットワーク入出力のような他の理由でブロックされるのであれば、たとえ飽和ポリシーが存在したとしてもスレッド飢餓状態のデッドロックを防ぐことはできないであろう。なお、SynchronousQueueはタスクを後の実行のために保存するものではないので、キューが無限に伸びてしまうことはない。すべてのタスクは実行中のスレッドあるいはスレッドプール内のスレッドによって処理される。
この解決法は、スレッドスケジューラの予測できない動作の影響を受ける場合があり、その場合スケジュールは最適化されないかもしれない。しかし、スレッド飢餓状態デッドロックは回避される。
リスク評価
スレッドプール内で相互に依存するタスクを実行すると、サービス停止状態につながることがある。
ルール | 深刻度 | 可能性 | 修正コスト | 優先度 | レベル |
---|---|---|---|---|---|
TPS01-J | 低 | 中 | 中 | P4 | L3 |
参考文献
[API 2006] | |
[Gafter 2006] | A Thread Pool Puzzler |
[Goetz 2006] | 8.3.2, Managing queued tasks; 8.3.3, Saturation Policies; 5.3.3, Dequeues and work stealing |
翻訳元
これは以下のページを翻訳したものです。
TPS01-J. Do not execute interdependent tasks in a bounded thread pool (revision 79)