Executorsを利用してみる



JDK5から「java.lang.Thread」を直にnewしなくてもスレッドを扱えるようになりました。

Executorsクラスでは、スレッド処理に必要なクラスを生成します。ファクトリの役割を担ってくれてます。
ExecutorService」の実装クラス、「ScheduledExecutorService」の実装クラスを生成します。
 

メソッド)newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()をみてみる

ExecutorService」の実装クラスを生成するには、3つのメソッドがあります。
 

No クラス名 概要
1 newSingleThreadExecutor 単一スレッドを作成する
2 newFixedThreadPool 固定数のスレッドを再利用するスレッドプールを作成する
3 newCachedThreadPool 必要に応じ、新規スレッドを作成するスレッドプールを作成する。60 秒間使用されなかったスレッドは、終了して、キャッシュから削除される。

java.util.concurrent.Executorsクラスのソースをみてみると、それぞれのメソッドの実装は下記のようになっています。
 
newSingleThreadExecutorの実装

 new ThreadPoolExecutor(1, 1, 0L, 
                          TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>())


newFixedThreadPoolの実装

  new ThreadPoolExecutor(nThreads, nThreads,
                          0L, TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>())


newCachedThreadPoolの実装

 new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                         60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>())


それぞれのメソッドをみてみると、実際の生成は、「ThreadPoolExecutor」クラスで行っています。
 

ThreadPoolExecutorをみてみる

 
ThreadPoolExecutorクラスには、下記のようなコンストラクタがあります。
 

 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 

 
それぞれの引数の意味は、下記の通りです。

No 引数 概要
1 corePoolSize アイドルであってもプール内に維持されるスレッドの数
2 maximumPoolSize プール内で可能なスレッドの最大数
3 keepAliveTime スレッドの数がコアよりも多い場合、これは超過したアイドル状態のスレッドが新しいタスクを待機してから終了するまでの最大時間
4 unit keepAliveTime 引数の時間単位
5 workQueue タスクが超過するまで保持するために使用するキュー。このキューは、execute メソッドで送信された Runnable タスクだけを保持する

 
引数をながめていると自分でいろいろ組み立てることもできそうです。
 

メソッド)newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()を生成してみる

newSingleThreadExecutorメソッド、newFixedThreadPoolメソッド、newCachedThreadPoolメソッドを利用して
それぞれの実装の仕方をみていきたいとおもいます。
 
まずは、newSingleThreadExecutor()メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newSingleThreadExecutor();
        e.execute(new RunnableClass()); 
        e.shutdown();

 
RunnableClass*1クラスをExecutorServiceのexecuteに渡してあげることにより
別スレッドで処理を実行してあげることができます。ExecutorService内で実行しているスレッドを終わらせる為に
shutdownメソッドを呼び出す必要があります。
 
次に、newFixedThreadPool(int nThreads)メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newFixedThreadPool(3);
    // RunnableClassで処理を追加していく
        for (int i = 0; i < 5; i++) {
            e.execute(new RunnableClass("Task" + i));
        } 
        e.shutdown();

 
固定数のスレッドを3としているので、3以上のスレッドは作成されません。
実行中のスレッドの上限は、常に3となります。

    java.lang.ThreadGroup[name=main,maxpri=10]
        Thread[main,5,main]
        Thread[pool-1-thread-1,5,main]
        Thread[pool-1-thread-2,5,main]
        Thread[pool-1-thread-3,5,main]

 
最後に、newCachedThreadPool()メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newCachedThreadPool();
    // RunnableClassで処理を追加していく
        for (int i = 0; i < 5; i++) {
            e.execute(new RunnableClass("Task" + i));
            try {
                Thread.sleep(1500L);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        e.shutdown();

60 秒間内ならば、生成された既存のスレッドを利用して、処理を順次実行してくれます。
 
RunnableClassは、下記の様な実装としてます。

    class RunnableClass implements Runnable {
        private String name;

        public RunnableClass(String name) {
            this.name = name;
        }

        public void run() {
            System.out.println(name + " Starts.");

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException ex) {
                System.out.println(name + " is Canceled");
                return;
            }
            System.out.println(name + " is Done.");
        }
    }

 

CassandraDaemonで利用されているExecutorService

Cassandraのソースをみていたときに、org.apache.cassandra.thrift.CassandraDaemonというクラスで
ThreadPoolを作成している箇所がありました。その時に、java.util.concurrentパッケージのExecutorService
クラスが出現し、一度、java.util.concurrentパッケージを眺めて理解しないとソースが理解できなさそうだったので、
確認をしてみました。
 

            // ThreadPool Server
            CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
            options.minWorkerThreads = MIN_WORKER_THREADS;

            ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
                    options.minWorkerThreads,
                    options.maxWorkerThreads);
            serverEngine = new CustomTThreadPoolServer(new TProcessorFactory(processor),
                    tServerSocket,
                    inTransportFactory,
                    outTransportFactory,
                    tProtocolFactory,
                    tProtocolFactory,
                    options,
                    executorService);

Cassandraは、結構、thriftのクラスをそのまま利用しているところが多々見受けられたので、
どこかのタイミングでthriftに関しても確認できればと思います。
 


*1:Runnableインターフェイスの実装クラス