001/*
002 *                    BioJava development code
003 *
004 * This code may be freely distributed and modified under the
005 * terms of the GNU Lesser General Public Licence.  This should
006 * be distributed with the code.  If you do not have a copy,
007 * see:
008 *
009 *      http://www.gnu.org/copyleft/lesser.html
010 *
011 * Copyright for this code is held jointly by the individual
012 * authors.  These should be listed in @author doc comments.
013 *
014 * For more information on the BioJava project and its aims,
015 * or to join the biojava-l mailing list, visit the home page
016 * at:
017 *
018 *      http://www.biojava.org/
019 *
020 */
021
022package org.biojava.utils;
023
024import java.util.LinkedList;
025
026/**
027 * <p><code>SimpleThreadPool</code> is a basic implementation of
028 * <code>ThreadPool</code> for use where we don't wish to introduce a
029 * dependency on a 3rd-party pool. In general, objects which require a
030 * pool should only use the interface and parameterize such that other
031 * implementations may be dropped in in place of this one, possibly
032 * using this one as a fallback.</p>
033 *
034 * <p>This class offers a service for running <code>Runnable</code>s
035 * using multiple threads, the number of which is specified in the
036 * constructor. <code>Runnable</code>s are queued in a simple FIFO
037 * queue. The worker threads wait on the queue when it is empty and
038 * are notified when a new <code>Runnable</code> is submitted.</p>
039 *
040 * <p>This implementation will prevent an application from exiting
041 * until <code>stopThreads()</code> is called unless the pool contains
042 * daemon threads.</p>
043 *
044 * @author Keith James
045 * @since 1.3
046 */
047public class SimpleThreadPool implements ThreadPool
048{
049    protected PooledThread [] threads;
050    protected int priority;
051
052    private LinkedList queue;
053    private boolean daemon;
054    private boolean waiting;
055    private boolean stopped;
056
057    /**
058     * Creates a new <code>SimpleThreadPool</code> containing 4
059     * non-daemon threads and starts them. The threads have priority
060     * Thread.NORM_PRIORITY.  Because threads are non-deamon you will need
061     * to call stopThreads() to terminate them.
062     */
063    public SimpleThreadPool()
064    {
065        this(4, false);
066    }
067
068    /**
069     * Creates a new <code>SimpleThreadPool</code> containing the
070     * specified number of threads and starts them. The threads have
071     * priority Thread.NORM_PRIORITY.
072     *
073     * @param threadCount an <code>int</code> thread count.
074     * @param daemon a <code>boolean</code> indicating whether the
075     * threads should be daemons. If threads are non-deamon you will need
076     * to call stopThreads() to terminate them.
077     */
078    public SimpleThreadPool(int threadCount, boolean daemon)
079    {
080        this(threadCount, daemon, Thread.NORM_PRIORITY);
081    }
082
083    /**
084     * Creates a new <code>SimpleThreadPool</code> containing the
085     * specified number of threads and starts them.
086     *
087     * @param threadCount an <code>int</code> thread count.
088     * @param daemon a <code>boolean</code> indicating whether the
089     * threads should be daemons. If threads are non-deamon you will need
090     * to call stopThreads() to terminate them.
091     * @param priority an <code>int</code> priority for the threads.
092     */
093    public SimpleThreadPool(int threadCount, boolean daemon, int priority)
094    {
095        this.daemon = daemon;
096        this.priority = priority;
097        queue = new LinkedList();
098        threads = new PooledThread[threadCount];
099        stopped = true;
100        waiting = false;
101        startThreads();
102    }
103
104    public void addRequest(Runnable task)
105    {
106        if (waiting || stopped)
107            throw new IllegalStateException("Thread pool has been closed to new requests");
108
109        synchronized(queue)
110        {
111            queue.add(task);
112            // Notify threads blocked in nextRequest()
113            queue.notifyAll();
114        }
115    }
116
117    public void startThreads()
118    {
119        if (! stopped)
120            throw new IllegalStateException("Thread pool is already started");
121
122        stopped = false;
123
124        synchronized(threads)
125        {
126            for (int i = 0; i < threads.length; i++)
127            {
128                threads[i] = new PooledThread();
129                if (daemon)
130                    threads[i].setDaemon(true);
131                threads[i].setPriority(priority);
132                threads[i].start();
133            }
134        }
135    }
136
137    /**
138     * Waits for all working threads to return and then stops them. If the
139     * thread pool contains non-daemon threads you will have to call this method
140     * to make your program return.
141     * @throws IllegalStateException if the pool is already stopped.
142     */
143    public void stopThreads()
144    {
145        if (stopped)
146            throw new IllegalStateException("Thread pool has already been stopped");
147
148        stopped = true;
149
150        synchronized(queue)
151        {
152            // Ensure working threads return and die
153            while (threadsAlive() > 0)
154            {
155                try
156                {
157                    queue.wait(500);
158                    queue.notifyAll();
159                }
160                catch (InterruptedException ie) { }
161            }
162        }
163    }
164
165    public void waitForThreads()
166    {
167        if (stopped)
168            throw new IllegalStateException("Thread pool has been stopped");
169
170        waiting = true;
171
172        synchronized(threads)
173        {
174            // Ensure queue gets emptied and all work is done
175            while (! queue.isEmpty() || threadsWorking() > 0)
176            {
177                try
178                {
179                    threads.wait();
180                }
181                catch (InterruptedException ie) { }
182            }
183        }
184
185        waiting = false;
186    }
187
188    /**
189     * <code>threadsWorking</code> returns the number of threads
190     * currently performing work.
191     *
192     * @return an <code>int</code>.
193     */
194    public int threadsWorking()
195    {
196        int workingCount = 0;
197
198        synchronized(threads)
199        {
200            for (int i = 0; i < threads.length; i++)
201                if (threads[i].working)
202                    workingCount++;
203        }
204
205        return workingCount;
206    }
207
208    /**
209     * <code>threadsIdle</code> returns the number of threads
210     * currently waiting for work.
211     *
212     * @return an <code>int</code>.
213     */
214    public int threadsIdle()
215    {
216        return threads.length - threadsWorking();
217    }
218
219    /**
220     * <code>requestsQueued</code> returns the number of
221     * <code>Runnable</code>s currently queued.
222     *
223     * @return an <code>int</code>.
224     */
225    public int requestsQueued()
226    {
227        return queue.size();
228    }
229
230    /**
231     * <code>threadsAlive</code> returns the number of threads
232     * currently alive.
233     *
234     * @return an <code>int</code>.
235     */
236    protected int threadsAlive()
237    {
238        int aliveCount = 0;
239
240        synchronized(threads)
241        {
242            for (int i = 0; i < threads.length; i++)
243                if (threads[i].isAlive())
244                    aliveCount++;
245        }
246
247        return aliveCount;
248    }
249
250    /**
251     * <code>nextRequest</code> gets the next <code>Runnable</code>
252     * from the queue. This method blocks if the queue is empty and
253     * the pool has not stopped. If the pool has stopped it returns
254     * null.
255     *
256     * @return a <code>Runnable</code> or null if the pool has been
257     * stopped.
258     */
259    protected Runnable nextRequest()
260    {
261        synchronized(queue)
262        {
263            try
264            {
265                while (! stopped && queue.isEmpty())
266                    queue.wait();
267            }
268            catch (InterruptedException ie) { }
269
270            if (stopped)
271                return null;
272            else
273                return (Runnable) queue.removeFirst();
274        }
275    }
276
277    /**
278     * <code>PooledThread</code> is a thread class which works within
279     * the pool. It sets its boolean flag true when working,
280     * synchronizing this on the array which contains all the
281     * <code>PooledThread</code>s.
282     */
283    private class PooledThread extends Thread
284    {
285        boolean working = false;
286
287        public void run()
288        {
289            while (true)
290            {
291                Runnable task = nextRequest();
292
293                // If the pool is stopped the queue returns null and
294                // the thread dies
295                if (task == null)
296                    break;
297
298                // Synchronize on thread array to update state
299                synchronized(threads)
300                {
301                    working = true;
302                }
303
304                task.run();
305
306                synchronized(threads)
307                {
308                    working = false;
309                    threads.notify();
310                }
311            }
312        }
313    }
314}