001/* 002 * BioJava development code 003 * 004 * This code may be freely distributed and modified under the 005 * terms of the GNU Lesser General Public Licence. This should 006 * be distributed with the code. If you do not have a copy, 007 * see: 008 * 009 * http://www.gnu.org/copyleft/lesser.html 010 * 011 * Copyright for this code is held jointly by the individual 012 * authors. These should be listed in @author doc comments. 013 * 014 * For more information on the BioJava project and its aims, 015 * or to join the biojava-l mailing list, visit the home page 016 * at: 017 * 018 * http://www.biojava.org/ 019 * 020 */ 021 022package org.biojava.utils; 023 024import java.util.LinkedList; 025 026/** 027 * <p><code>SimpleThreadPool</code> is a basic implementation of 028 * <code>ThreadPool</code> for use where we don't wish to introduce a 029 * dependency on a 3rd-party pool. In general, objects which require a 030 * pool should only use the interface and parameterize such that other 031 * implementations may be dropped in in place of this one, possibly 032 * using this one as a fallback.</p> 033 * 034 * <p>This class offers a service for running <code>Runnable</code>s 035 * using multiple threads, the number of which is specified in the 036 * constructor. <code>Runnable</code>s are queued in a simple FIFO 037 * queue. The worker threads wait on the queue when it is empty and 038 * are notified when a new <code>Runnable</code> is submitted.</p> 039 * 040 * <p>This implementation will prevent an application from exiting 041 * until <code>stopThreads()</code> is called unless the pool contains 042 * daemon threads.</p> 043 * 044 * @author Keith James 045 * @since 1.3 046 */ 047public class SimpleThreadPool implements ThreadPool 048{ 049 protected PooledThread [] threads; 050 protected int priority; 051 052 private LinkedList queue; 053 private boolean daemon; 054 private boolean waiting; 055 private boolean stopped; 056 057 /** 058 * Creates a new <code>SimpleThreadPool</code> containing 4 059 * non-daemon threads and starts them. The threads have priority 060 * Thread.NORM_PRIORITY. Because threads are non-deamon you will need 061 * to call stopThreads() to terminate them. 062 */ 063 public SimpleThreadPool() 064 { 065 this(4, false); 066 } 067 068 /** 069 * Creates a new <code>SimpleThreadPool</code> containing the 070 * specified number of threads and starts them. The threads have 071 * priority Thread.NORM_PRIORITY. 072 * 073 * @param threadCount an <code>int</code> thread count. 074 * @param daemon a <code>boolean</code> indicating whether the 075 * threads should be daemons. If threads are non-deamon you will need 076 * to call stopThreads() to terminate them. 077 */ 078 public SimpleThreadPool(int threadCount, boolean daemon) 079 { 080 this(threadCount, daemon, Thread.NORM_PRIORITY); 081 } 082 083 /** 084 * Creates a new <code>SimpleThreadPool</code> containing the 085 * specified number of threads and starts them. 086 * 087 * @param threadCount an <code>int</code> thread count. 088 * @param daemon a <code>boolean</code> indicating whether the 089 * threads should be daemons. If threads are non-deamon you will need 090 * to call stopThreads() to terminate them. 091 * @param priority an <code>int</code> priority for the threads. 092 */ 093 public SimpleThreadPool(int threadCount, boolean daemon, int priority) 094 { 095 this.daemon = daemon; 096 this.priority = priority; 097 queue = new LinkedList(); 098 threads = new PooledThread[threadCount]; 099 stopped = true; 100 waiting = false; 101 startThreads(); 102 } 103 104 public void addRequest(Runnable task) 105 { 106 if (waiting || stopped) 107 throw new IllegalStateException("Thread pool has been closed to new requests"); 108 109 synchronized(queue) 110 { 111 queue.add(task); 112 // Notify threads blocked in nextRequest() 113 queue.notifyAll(); 114 } 115 } 116 117 public void startThreads() 118 { 119 if (! stopped) 120 throw new IllegalStateException("Thread pool is already started"); 121 122 stopped = false; 123 124 synchronized(threads) 125 { 126 for (int i = 0; i < threads.length; i++) 127 { 128 threads[i] = new PooledThread(); 129 if (daemon) 130 threads[i].setDaemon(true); 131 threads[i].setPriority(priority); 132 threads[i].start(); 133 } 134 } 135 } 136 137 /** 138 * Waits for all working threads to return and then stops them. If the 139 * thread pool contains non-daemon threads you will have to call this method 140 * to make your program return. 141 * @throws IllegalStateException if the pool is already stopped. 142 */ 143 public void stopThreads() 144 { 145 if (stopped) 146 throw new IllegalStateException("Thread pool has already been stopped"); 147 148 stopped = true; 149 150 synchronized(queue) 151 { 152 // Ensure working threads return and die 153 while (threadsAlive() > 0) 154 { 155 try 156 { 157 queue.wait(500); 158 queue.notifyAll(); 159 } 160 catch (InterruptedException ie) { } 161 } 162 } 163 } 164 165 public void waitForThreads() 166 { 167 if (stopped) 168 throw new IllegalStateException("Thread pool has been stopped"); 169 170 waiting = true; 171 172 synchronized(threads) 173 { 174 // Ensure queue gets emptied and all work is done 175 while (! queue.isEmpty() || threadsWorking() > 0) 176 { 177 try 178 { 179 threads.wait(); 180 } 181 catch (InterruptedException ie) { } 182 } 183 } 184 185 waiting = false; 186 } 187 188 /** 189 * <code>threadsWorking</code> returns the number of threads 190 * currently performing work. 191 * 192 * @return an <code>int</code>. 193 */ 194 public int threadsWorking() 195 { 196 int workingCount = 0; 197 198 synchronized(threads) 199 { 200 for (int i = 0; i < threads.length; i++) 201 if (threads[i].working) 202 workingCount++; 203 } 204 205 return workingCount; 206 } 207 208 /** 209 * <code>threadsIdle</code> returns the number of threads 210 * currently waiting for work. 211 * 212 * @return an <code>int</code>. 213 */ 214 public int threadsIdle() 215 { 216 return threads.length - threadsWorking(); 217 } 218 219 /** 220 * <code>requestsQueued</code> returns the number of 221 * <code>Runnable</code>s currently queued. 222 * 223 * @return an <code>int</code>. 224 */ 225 public int requestsQueued() 226 { 227 return queue.size(); 228 } 229 230 /** 231 * <code>threadsAlive</code> returns the number of threads 232 * currently alive. 233 * 234 * @return an <code>int</code>. 235 */ 236 protected int threadsAlive() 237 { 238 int aliveCount = 0; 239 240 synchronized(threads) 241 { 242 for (int i = 0; i < threads.length; i++) 243 if (threads[i].isAlive()) 244 aliveCount++; 245 } 246 247 return aliveCount; 248 } 249 250 /** 251 * <code>nextRequest</code> gets the next <code>Runnable</code> 252 * from the queue. This method blocks if the queue is empty and 253 * the pool has not stopped. If the pool has stopped it returns 254 * null. 255 * 256 * @return a <code>Runnable</code> or null if the pool has been 257 * stopped. 258 */ 259 protected Runnable nextRequest() 260 { 261 synchronized(queue) 262 { 263 try 264 { 265 while (! stopped && queue.isEmpty()) 266 queue.wait(); 267 } 268 catch (InterruptedException ie) { } 269 270 if (stopped) 271 return null; 272 else 273 return (Runnable) queue.removeFirst(); 274 } 275 } 276 277 /** 278 * <code>PooledThread</code> is a thread class which works within 279 * the pool. It sets its boolean flag true when working, 280 * synchronizing this on the array which contains all the 281 * <code>PooledThread</code>s. 282 */ 283 private class PooledThread extends Thread 284 { 285 boolean working = false; 286 287 public void run() 288 { 289 while (true) 290 { 291 Runnable task = nextRequest(); 292 293 // If the pool is stopped the queue returns null and 294 // the thread dies 295 if (task == null) 296 break; 297 298 // Synchronize on thread array to update state 299 synchronized(threads) 300 { 301 working = true; 302 } 303 304 task.run(); 305 306 synchronized(threads) 307 { 308 working = false; 309 threads.notify(); 310 } 311 } 312 } 313 } 314}