001/* 002 * BioJava development code 003 * 004 * This code may be freely distributed and modified under the 005 * terms of the GNU Lesser General Public Licence. This should 006 * be distributed with the code. If you do not have a copy, 007 * see: 008 * 009 * http://www.gnu.org/copyleft/lesser.html 010 * 011 * Copyright for this code is held jointly by the individual 012 * authors. These should be listed in @author doc comments. 013 * 014 * For more information on the BioJava project and its aims, 015 * or to join the biojava-l mailing list, visit the home page 016 * at: 017 * 018 * http://www.biojava.org/ 019 * 020 * Created on May 26, 2010 021 * Author: Mark Chapman 022 */ 023 024package org.biojava.nbio.core.util; 025 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import java.util.concurrent.*; 030 031/** 032 * Static utility to easily share a thread pool for concurrent/parallel/lazy execution. To exit cleanly, 033 * {@link #shutdown()} or {@link #shutdownAndAwaitTermination()} must be called after all tasks have been submitted. 034 * 035 * @author Mark Chapman 036 */ 037public class ConcurrencyTools { 038 039 private final static Logger logger = LoggerFactory.getLogger(ConcurrencyTools.class); 040 041 private static ThreadPoolExecutor pool; 042 private static int tasks = 0; 043 private ConcurrencyTools() { } 044 045 /** 046 * Returns current shared thread pool. Starts up a new pool, if necessary. 047 * 048 * @return shared thread pool 049 */ 050 public static ThreadPoolExecutor getThreadPool() { 051 if (pool == null || pool.isShutdown()) { 052 setThreadPoolDefault(); 053 } 054 return pool; 055 } 056 057 /** 058 * Sets thread pool to reserve a given number of processor cores for foreground or other use. 059 * 060 * @param cpus number of processor cores to reserve 061 */ 062 public static void setThreadPoolCPUsAvailable(int cpus) { 063 setThreadPoolSize(Math.max(1, Runtime.getRuntime().availableProcessors() - cpus)); 064 } 065 066 /** 067 * Sets thread pool to a given fraction of the available processors. 068 * 069 * @param fraction portion of available processors to use in thread pool 070 */ 071 public static void setThreadPoolCPUsFraction(float fraction) { 072 setThreadPoolSize(Math.max(1, Math.round(fraction * Runtime.getRuntime().availableProcessors()))); 073 } 074 075 /** 076 * Sets thread pool to default of 1 background thread for each processor core. 077 */ 078 public static void setThreadPoolDefault() { 079 setThreadPoolCPUsAvailable(0); 080 } 081 082 /** 083 * Sets thread pool to a single background thread. 084 */ 085 public static void setThreadPoolSingle() { 086 setThreadPoolSize(1); 087 } 088 089 /** 090 * Sets thread pool to given size. 091 * 092 * @param threads number of threads in pool 093 */ 094 public static void setThreadPoolSize(int threads) { 095 setThreadPool( new ThreadPoolExecutor(threads, threads, 096 0L, TimeUnit.MILLISECONDS, 097 new LinkedBlockingQueue<Runnable>())); 098 099 100 } 101 102 /** 103 * Sets thread pool to any given {@link ThreadPoolExecutor} to allow use of an alternative execution style. 104 * 105 * @param pool thread pool to share 106 */ 107 public static void setThreadPool(ThreadPoolExecutor pool) { 108 if (ConcurrencyTools.pool != pool) { 109 shutdown(); 110 ConcurrencyTools.pool = pool; 111 } 112 } 113 114 /** 115 * Disables new tasks from being submitted and closes the thread pool cleanly. 116 */ 117 public static void shutdown() { 118 if (pool != null) { 119 pool.shutdown(); 120 } 121 } 122 123 /** 124 * Closes the thread pool. Waits 1 minute for a clean exit; if necessary, waits another minute for cancellation. 125 */ 126 public static void shutdownAndAwaitTermination() { 127 shutdown(); 128 if (pool != null) { 129 try { 130 // wait a while for existing tasks to terminate 131 if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) { 132 pool.shutdownNow(); // cancel currently executing tasks 133 // wait a while for tasks to respond to being canceled 134 if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) { 135 logger.warn("BioJava ConcurrencyTools thread pool did not terminate"); 136 } 137 } 138 } catch (InterruptedException ie) { 139 pool.shutdownNow(); // (re-)cancel if current thread also interrupted 140 Thread.currentThread().interrupt(); // preserve interrupt status 141 } 142 } 143 } 144 145 /** 146 * Queues up a task and adds a log entry. 147 * 148 * @param <T> type returned from the submitted task 149 * @param task submitted task 150 * @param message logged message 151 * @return future on which the desired value is retrieved by calling get() 152 */ 153 public static<T> Future<T> submit(Callable<T> task, String message) { 154 logger.debug("Task " + (++tasks) + " submitted to shared thread pool. " + message); 155 return getThreadPool().submit(task); 156 } 157 158 /** 159 * Queues up a task and adds a default log entry. 160 * 161 * @param <T> type returned from the submitted task 162 * @param task submitted task 163 * @return future on which the desired value is retrieved by calling get() 164 */ 165 public static<T> Future<T> submit(Callable<T> task) { 166 return submit(task, ""); 167 } 168 169}