001package org.biojava.nbio.structure.align;
002
003/*
004 *                    BioJava development code
005 *
006 * This code may be freely distributed and modified under the
007 * terms of the GNU Lesser General Public Licence.  This should
008 * be distributed with the code.  If you do not have a copy,
009 * see:
010 *
011 *      http://www.gnu.org/copyleft/lesser.html
012 *
013 * Copyright for this code is held jointly by the individual
014 * authors.  These should be listed in @author doc comments.
015 *
016 * For more information on the BioJava project and its aims,
017 * or to join the biojava-l mailing list, visit the home page
018 * at:
019 *
020 *      http://www.biojava.org/
021 *
022 * Created on Feb 11, 2013
023 * Author: Andreas Prlic
024 *
025 */
026
027import org.biojava.nbio.structure.Atom;
028import org.biojava.nbio.structure.Structure;
029import org.biojava.nbio.structure.StructureException;
030import org.biojava.nbio.structure.StructureTools;
031import org.biojava.nbio.structure.align.ce.*;
032import org.biojava.nbio.structure.align.client.FarmJobParameters;
033import org.biojava.nbio.structure.align.client.JFatCatClient;
034import org.biojava.nbio.structure.align.client.PdbPair;
035import org.biojava.nbio.structure.align.client.StructureName;
036import org.biojava.nbio.structure.align.model.AFPChain;
037import org.biojava.nbio.structure.align.util.AtomCache;
038import org.biojava.nbio.structure.align.util.SynchronizedOutFile;
039import org.biojava.nbio.structure.domain.DomainProvider;
040import org.biojava.nbio.structure.domain.DomainProviderFactory;
041import org.biojava.nbio.structure.domain.RemoteDomainProvider;
042import org.biojava.nbio.structure.io.LocalPDBDirectory.FetchBehavior;
043import org.biojava.nbio.structure.io.PDBFileReader;
044import org.biojava.nbio.core.util.ConcurrencyTools;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import java.io.File;
049import java.io.IOException;
050import java.util.SortedSet;
051import java.util.concurrent.ExecutorService;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.atomic.AtomicBoolean;
054
055
056/** Performs a multi threaded database search for an input protein structure
057 *
058 * @author Andreas Prlic
059 *
060 */
061
062public class MultiThreadedDBSearch {
063
064        private final static Logger logger = LoggerFactory.getLogger(MultiThreadedDBSearch.class);
065
066        AtomicBoolean interrupted  ;
067
068        StructureAlignment algorithm;
069
070        String outFile;
071
072        String name1;
073
074        int nrCPUs;
075
076        AtomCache cache;
077        File resultList;
078        SortedSet<String> representatives;
079
080        boolean domainSplit;
081
082        Structure structure1;
083
084        String customFile1;
085        String customChain1;
086
087        public MultiThreadedDBSearch(String name, Structure structure,
088                        String outFile,
089                        StructureAlignment algorithm,
090                        int nrCPUs, boolean domainSplit){
091
092                interrupted = new AtomicBoolean(false);
093                this.name1= name;
094                this.structure1 = structure;
095                this.outFile = outFile;
096                this.algorithm = algorithm;
097                this.nrCPUs = nrCPUs;
098                this.domainSplit = domainSplit;
099                cache  = new AtomCache();
100
101                String serverLocation = FarmJobParameters.DEFAULT_SERVER_URL;
102                if ( representatives == null){
103                        SortedSet<String> repre = JFatCatClient.getRepresentatives(serverLocation,40);
104                        logger.info("got {} representatives for comparison", repre.size());
105                        representatives = repre;
106                }
107        }
108
109
110        public String getCustomFile1() {
111                return customFile1;
112        }
113
114
115        /** set the file path for a custom, user provided file, not a standard PDB file.
116         *
117         * @param customFile1
118         */
119        public void setCustomFile1(String customFile1) {
120                this.customFile1 = customFile1;
121        }
122
123
124
125        public String getCustomChain1() {
126                return customChain1;
127        }
128
129        /** sets a chain in a custom, user provided file
130         *
131         * @param customChain1
132         */
133        public void setCustomChain1(String customChain1) {
134                this.customChain1 = customChain1;
135        }
136
137
138        public AtomCache getAtomCache() {
139                return cache;
140        }
141
142        public void setAtomCache(AtomCache cache) {
143                this.cache = cache;
144        }
145
146
147
148        public StructureAlignment getAlgorithm() {
149                return algorithm;
150        }
151
152        public void setAlgorithm(StructureAlignment algo) {
153                this.algorithm = algo;
154        }
155
156
157        public String getOutFile() {
158                return outFile;
159        }
160
161
162        public void setOutFile(String outFile) {
163                this.outFile = outFile;
164        }
165
166
167        public static String getLegend(String algorithmName){
168
169                if ( algorithmName.equalsIgnoreCase(CeMain.algorithmName) ||
170                                algorithmName.equalsIgnoreCase(CeSideChainMain.algorithmName) ||
171                                algorithmName.equalsIgnoreCase(CeCPMain.algorithmName)) {
172                        return "# name1\tname2\tscore\tz-score\trmsd\tlen1\tlen2\tcov1\tcov2\t%ID\tDescription\t " ;
173                }
174
175                // looks like a FATCAT alignment
176
177                return "# name1\tname2\tscore\tprobability\trmsd\tlen1\tlen2\tcov1\tcov2\t%ID\tDescription\t " ;
178
179        }
180
181
182
183        public File getResultFile() {
184                return resultList;
185        }
186
187
188        public void setResultFile(File resultList) {
189                this.resultList = resultList;
190        }
191
192
193        public void run(){
194
195                File outFileF = null;
196                SynchronizedOutFile out ;
197
198                try {
199                        checkLocalFiles();
200
201                        if ( interrupted.get())
202                                return;
203
204                        String header = "# algorithm:" + algorithm.getAlgorithmName();
205                        String legend = getLegend(algorithm.getAlgorithmName());
206
207
208
209                        outFileF = new File(outFile);
210                        if ( ! outFileF.isDirectory()){
211                                logger.error("{} is not a directory, can't create result files in there...", outFileF.getAbsolutePath());
212                                interrupt();
213                                cleanup();
214                        }
215
216                        if ( name1 == null)
217                                name1 = "CUSTOM";
218
219
220                        resultList = new File(outFileF,"results_" + name1 + ".out");
221
222                        logger.info("writing results to {}", resultList.getAbsolutePath());
223
224
225
226                        out = new SynchronizedOutFile(resultList);
227
228                        out.write(header);
229                        out.write(AFPChain.newline);
230                        out.write(legend);
231                        out.write(AFPChain.newline);
232
233                        if ( name1.equals("CUSTOM")) {
234
235                                String config1 = "#param:file1=" + customFile1;
236                                out.write(config1);
237                                out.write(AFPChain.newline);
238
239                                if ( customChain1 != null) {
240                                String config2 = "#param:chain1=" + customChain1;
241                                out.write(config2);
242                                out.write(AFPChain.newline);
243                                }
244
245                        }
246
247                        if ( algorithm.getAlgorithmName().startsWith("jCE")){
248                                ConfigStrucAligParams params = algorithm.getParameters();
249                                if ( params instanceof CeParameters){
250                                        CeParameters ceParams = (CeParameters) params;
251                                        if ( ceParams.getScoringStrategy() != CeParameters.ScoringStrategy.DEFAULT_SCORING_STRATEGY) {
252                                                String scoring = "#param:scoring=" + ceParams.getScoringStrategy();
253                                                out.write(scoring);
254                                                out.write(AFPChain.newline);
255                                        }
256                                }
257                        }
258
259                        out.flush();
260                } catch (IOException e){
261                        logger.error("Error while loading representative structure {}", name1, e);
262                        interrupt();
263                        cleanup();
264                        return;
265                } catch (StructureException e) {
266                        logger.error("Error while loading representative structure {}", name1, e);
267                        interrupt();
268                        cleanup();
269                        return;
270                }
271
272
273                int nrJobs = 0;
274                DomainProvider domainProvider;
275                try {
276                        domainProvider = DomainProviderFactory.getDomainProvider();
277
278                        ConcurrencyTools.setThreadPoolSize(nrCPUs);
279
280                        Atom[] ca1 = StructureTools.getRepresentativeAtomArray(structure1);
281
282                        for (String repre : representatives){
283
284                                if( domainSplit ) {
285                                        SortedSet<String> domainNames = domainProvider.getDomainNames(repre);
286                                        //logger.debug(repre +" got domains: " +domainNames);
287                                        if( domainNames == null || domainNames.size()==0){
288                                                // no domains found, use whole chain.
289                                                submit(name1, repre, ca1, algorithm, outFileF, out, cache);
290                                                nrJobs++;
291                                                continue;
292                                        }
293                                        //logger.debug("got " + domainNames.size() + " for " + repre);
294                                        for( String domain : domainNames){
295                                                submit(name1, domain, ca1, algorithm, outFileF, out, cache);
296                                                nrJobs++;
297                                        }
298                                } else {
299                                        submit(name1, repre, ca1, algorithm, outFileF, out, cache);
300                                        nrJobs++;
301                                }
302
303                        }
304                } catch(IOException e) {
305                        logger.error("Error while fetching representative domains", e);
306                        interrupt();
307                        cleanup();
308                        return;
309                } catch (StructureException e) {
310                        logger.error("Error while fetching representative domains", e);
311                        interrupt();
312                        cleanup();
313                        return;
314                }
315
316
317                ThreadPoolExecutor  pool = ConcurrencyTools.getThreadPool();
318                logger.info("{}", pool.getPoolSize());
319
320                long startTime = System.currentTimeMillis();
321
322                try {
323                        while ( pool.getCompletedTaskCount() < nrJobs-1  ) {
324                                //long now = System.currentTimeMillis();
325                                //logger.debug( pool.getCompletedTaskCount() + " " + (now-startTime)/1000 + " " + pool.getPoolSize() + " " + pool.getActiveCount()  + " " + pool.getTaskCount()  );
326                                //                              if ((now-startTime)/1000 > 60) {
327                                //
328                                //                                      interrupt();
329                                //                                      logger.debug("completed: " + pool.getCompletedTaskCount());
330                                //                              }
331
332                                if ( interrupted.get())
333                                        break;
334
335                                Thread.sleep(2000);
336
337                        }
338                        out.close();
339                }
340                catch (Exception e){
341                        logger.error("Exception: ", e);
342                        interrupt();
343                        cleanup();
344                }
345
346                if (domainProvider instanceof RemoteDomainProvider){
347                        RemoteDomainProvider remote = (RemoteDomainProvider) domainProvider;
348                        remote.flushCache();
349                }
350                long now = System.currentTimeMillis();
351                logger.info("Calculation took : {} sec.", (now-startTime)/1000);
352                logger.info("{} {} {} {}", pool.getCompletedTaskCount(), pool.getPoolSize(), pool.getActiveCount(), pool.getTaskCount());
353        }
354
355
356
357        private void checkLocalFiles() throws IOException, StructureException {
358
359                logger.info("Checking local PDB installation in directory: {}", cache.getPath());
360
361                File f = new File(cache.getPath());
362                if ( ! f.isDirectory()) {
363                        logger.error("The path {} should point to a directory!", f.getAbsolutePath());
364                }
365
366                if ( ! f.canWrite()) {
367                        logger.error("You do not have permission to write to {}. There could be a problem if the PDB installation is not up-to-date with fetching missing PDB files.", f.getAbsolutePath());
368                }
369
370                DomainProvider domainProvider = DomainProviderFactory.getDomainProvider();
371
372
373
374                for (String repre : representatives){
375
376                        if ( interrupted.get())
377                                return;
378
379                        if( domainSplit ) {
380                                SortedSet<String> domainNames = domainProvider.getDomainNames(repre);
381                                //logger.debug(repre +" got domains: " +domainNames);
382                                if( domainNames == null || domainNames.size()==0){
383                                        // no domains found, use whole chain.
384                                        //submit(name1, repre, ca1, algorithm, outFileF, out, cache);
385                                        checkFile(repre);
386                                        continue;
387                                }
388                                //logger.debug("got " + domainNames.size() + " for " + repre);
389                                for( String domain : domainNames){
390                                        //submit(name1, domain, ca1, algorithm, outFileF, out, cache);
391                                        checkFile(domain);
392                                }
393                        } else {
394                                //submit(name1, repre, ca1, algorithm, outFileF, out, cache);
395                                checkFile(repre);
396                        }
397
398                }
399
400                if ( domainProvider instanceof RemoteDomainProvider ) {
401                        RemoteDomainProvider remoteP = (RemoteDomainProvider) domainProvider;
402                        remoteP.flushCache();
403                }
404
405                logger.info("done checking local files...");
406
407        }
408
409
410        private void checkFile(String repre) throws IOException, StructureException {
411
412                StructureName name = new StructureName(repre);
413
414                PDBFileReader reader = new PDBFileReader();
415                reader.setFetchBehavior(FetchBehavior.FETCH_REMEDIATED);
416                reader.setPath(cache.getPath());
417                reader.setFileParsingParameters(cache.getFileParsingParams());
418                reader.prefetchStructure(name.getPdbId());
419        }
420
421
422        private void submit(String name12, String repre, Atom[] ca1, StructureAlignment algorithm , File outFileF , SynchronizedOutFile out , AtomCache cache ) {
423                CallableStructureAlignment ali = new CallableStructureAlignment();
424
425                PdbPair pair = new PdbPair(name1, repre);
426                try {
427                        ali.setCa1(ca1);
428                } catch (Exception e){
429                        logger.error("Exception: ", e);
430                        ConcurrencyTools.shutdown();
431                        return;
432                }
433                ali.setAlgorithmName(algorithm.getAlgorithmName());
434                ali.setParameters(algorithm.getParameters());
435                ali.setPair(pair);
436                ali.setOutFile(out);
437                ali.setOutputDir(outFileF);
438                ali.setCache(cache);
439
440                ConcurrencyTools.submit(ali);
441
442        }
443
444
445        /** stops what is currently happening and does not continue
446         *
447         *
448         */
449        public void interrupt() {
450                interrupted.set(true);
451                ExecutorService pool = ConcurrencyTools.getThreadPool();
452                pool.shutdownNow();
453                try {
454                        DomainProvider domainProvider = DomainProviderFactory.getDomainProvider();
455                        if (domainProvider instanceof RemoteDomainProvider){
456                                RemoteDomainProvider remote = (RemoteDomainProvider) domainProvider;
457                                remote.flushCache();
458                        }
459                } catch (IOException e) {
460                        // If errors occur, the cache should be empty anyways
461                }
462
463        }
464
465        public void cleanup()
466        {
467
468                structure1 = null;
469
470
471        }
472
473}