001/*
002 *                    BioJava development code
003 *
004 * This code may be freely distributed and modified under the
005 * terms of the GNU Lesser General Public Licence.  This should
006 * be distributed with the code.  If you do not have a copy,
007 * see:
008 *
009 *      http://www.gnu.org/copyleft/lesser.html
010 *
011 * Copyright for this code is held jointly by the individual
012 * authors.  These should be listed in @author doc comments.
013 *
014 * For more information on the BioJava project and its aims,
015 * or to join the biojava-l mailing list, visit the home page
016 * at:
017 *
018 *      http://www.biojava.org/
019 *
020 * Created on May 26, 2010
021 * Author: Mark Chapman
022 */
023
024package org.biojava.nbio.core.util;
025
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import java.util.concurrent.*;
030
031/**
032 * Static utility to easily share a thread pool for concurrent/parallel/lazy execution.  To exit cleanly,
033 * {@link #shutdown()} or {@link #shutdownAndAwaitTermination()} must be called after all tasks have been submitted.
034 *
035 * @author Mark Chapman
036 */
037public class ConcurrencyTools {
038
039        private final static Logger logger = LoggerFactory.getLogger(ConcurrencyTools.class);
040
041        private static ThreadPoolExecutor pool;
042        private static int tasks = 0;
043        private ConcurrencyTools() { }
044
045        /**
046         * Returns current shared thread pool.  Starts up a new pool, if necessary.
047         *
048         * @return shared thread pool
049         */
050        public static ThreadPoolExecutor getThreadPool() {
051                if (pool == null || pool.isShutdown()) {
052                        setThreadPoolDefault();
053                }
054                return pool;
055        }
056
057        /**
058         * Sets thread pool to reserve a given number of processor cores for foreground or other use.
059         *
060         * @param cpus number of processor cores to reserve
061         */
062        public static void setThreadPoolCPUsAvailable(int cpus) {
063                setThreadPoolSize(Math.max(1, Runtime.getRuntime().availableProcessors() - cpus));
064        }
065
066        /**
067         * Sets thread pool to a given fraction of the available processors.
068         *
069         * @param fraction portion of available processors to use in thread pool
070         */
071        public static void setThreadPoolCPUsFraction(float fraction) {
072                setThreadPoolSize(Math.max(1, Math.round(fraction * Runtime.getRuntime().availableProcessors())));
073        }
074
075        /**
076         * Sets thread pool to default of 1 background thread for each processor core.
077         */
078        public static void setThreadPoolDefault() {
079                setThreadPoolCPUsAvailable(0);
080        }
081
082        /**
083         * Sets thread pool to a single background thread.
084         */
085        public static void setThreadPoolSingle() {
086                setThreadPoolSize(1);
087        }
088
089        /**
090         * Sets thread pool to given size.
091         *
092         * @param threads number of threads in pool
093         */
094        public static void setThreadPoolSize(int threads) {
095                setThreadPool(   new ThreadPoolExecutor(threads, threads,
096                                                                          0L, TimeUnit.MILLISECONDS,
097                                                                          new LinkedBlockingQueue<Runnable>()));
098
099
100        }
101
102        /**
103         * Sets thread pool to any given {@link ThreadPoolExecutor} to allow use of an alternative execution style.
104         *
105         * @param pool thread pool to share
106         */
107        public static void setThreadPool(ThreadPoolExecutor pool) {
108                if (ConcurrencyTools.pool != pool) {
109                        shutdown();
110                        ConcurrencyTools.pool = pool;
111                }
112        }
113
114        /**
115         * Disables new tasks from being submitted and closes the thread pool cleanly.
116         */
117        public static void shutdown() {
118                if (pool != null) {
119                        pool.shutdown();
120                }
121        }
122
123        /**
124         * Closes the thread pool.  Waits 1 minute for a clean exit; if necessary, waits another minute for cancellation.
125         */
126        public static void shutdownAndAwaitTermination() {
127                shutdown();
128                if (pool != null) {
129                        try {
130                                // wait a while for existing tasks to terminate
131                                if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
132                                        pool.shutdownNow(); // cancel currently executing tasks
133                                        // wait a while for tasks to respond to being canceled
134                                        if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
135                                                logger.warn("BioJava ConcurrencyTools thread pool did not terminate");
136                                        }
137                                }
138                        } catch (InterruptedException ie) {
139                                pool.shutdownNow(); // (re-)cancel if current thread also interrupted
140                                Thread.currentThread().interrupt(); // preserve interrupt status
141                        }
142                }
143        }
144
145        /**
146         * Queues up a task and adds a log entry.
147         *
148         * @param <T> type returned from the submitted task
149         * @param task submitted task
150         * @param message logged message
151         * @return future on which the desired value is retrieved by calling get()
152         */
153        public static<T> Future<T> submit(Callable<T> task, String message) {
154                logger.debug("Task " + (++tasks) + " submitted to shared thread pool. " + message);
155                return getThreadPool().submit(task);
156        }
157
158        /**
159         * Queues up a task and adds a default log entry.
160         *
161         * @param <T> type returned from the submitted task
162         * @param task submitted task
163         * @return future on which the desired value is retrieved by calling get()
164         */
165        public static<T> Future<T> submit(Callable<T> task) {
166                return submit(task, "");
167        }
168
169}