001package org.biojava.nbio.structure.align; 002 003/* 004 * BioJava development code 005 * 006 * This code may be freely distributed and modified under the 007 * terms of the GNU Lesser General Public Licence. This should 008 * be distributed with the code. If you do not have a copy, 009 * see: 010 * 011 * http://www.gnu.org/copyleft/lesser.html 012 * 013 * Copyright for this code is held jointly by the individual 014 * authors. These should be listed in @author doc comments. 015 * 016 * For more information on the BioJava project and its aims, 017 * or to join the biojava-l mailing list, visit the home page 018 * at: 019 * 020 * http://www.biojava.org/ 021 * 022 * Created on Feb 11, 2013 023 * Author: Andreas Prlic 024 * 025 */ 026 027import org.biojava.nbio.structure.Atom; 028import org.biojava.nbio.structure.Structure; 029import org.biojava.nbio.structure.StructureException; 030import org.biojava.nbio.structure.StructureTools; 031import org.biojava.nbio.structure.align.ce.*; 032import org.biojava.nbio.structure.align.client.FarmJobParameters; 033import org.biojava.nbio.structure.align.client.JFatCatClient; 034import org.biojava.nbio.structure.align.client.PdbPair; 035import org.biojava.nbio.structure.align.client.StructureName; 036import org.biojava.nbio.structure.align.model.AFPChain; 037import org.biojava.nbio.structure.align.util.AtomCache; 038import org.biojava.nbio.structure.align.util.SynchronizedOutFile; 039import org.biojava.nbio.structure.domain.DomainProvider; 040import org.biojava.nbio.structure.domain.DomainProviderFactory; 041import org.biojava.nbio.structure.domain.RemoteDomainProvider; 042import org.biojava.nbio.structure.io.LocalPDBDirectory.FetchBehavior; 043import org.biojava.nbio.structure.io.PDBFileReader; 044import org.biojava.nbio.core.util.ConcurrencyTools; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import java.io.File; 049import java.io.IOException; 050import java.util.SortedSet; 051import java.util.concurrent.ExecutorService; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.atomic.AtomicBoolean; 054 055 056/** Performs a multi threaded database search for an input protein structure 057 * 058 * @author Andreas Prlic 059 * 060 */ 061 062public class MultiThreadedDBSearch { 063 064 private final static Logger logger = LoggerFactory.getLogger(MultiThreadedDBSearch.class); 065 066 AtomicBoolean interrupted ; 067 068 StructureAlignment algorithm; 069 070 String outFile; 071 072 String name1; 073 074 int nrCPUs; 075 076 AtomCache cache; 077 File resultList; 078 SortedSet<String> representatives; 079 080 boolean domainSplit; 081 082 Structure structure1; 083 084 String customFile1; 085 String customChain1; 086 087 public MultiThreadedDBSearch(String name, Structure structure, 088 String outFile, 089 StructureAlignment algorithm, 090 int nrCPUs, boolean domainSplit){ 091 092 interrupted = new AtomicBoolean(false); 093 this.name1= name; 094 this.structure1 = structure; 095 this.outFile = outFile; 096 this.algorithm = algorithm; 097 this.nrCPUs = nrCPUs; 098 this.domainSplit = domainSplit; 099 cache = new AtomCache(); 100 101 String serverLocation = FarmJobParameters.DEFAULT_SERVER_URL; 102 if ( representatives == null){ 103 SortedSet<String> repre = JFatCatClient.getRepresentatives(serverLocation,40); 104 logger.info("got {} representatives for comparison", repre.size()); 105 representatives = repre; 106 } 107 } 108 109 110 public String getCustomFile1() { 111 return customFile1; 112 } 113 114 115 /** set the file path for a custom, user provided file, not a standard PDB file. 116 * 117 * @param customFile1 118 */ 119 public void setCustomFile1(String customFile1) { 120 this.customFile1 = customFile1; 121 } 122 123 124 125 public String getCustomChain1() { 126 return customChain1; 127 } 128 129 /** sets a chain in a custom, user provided file 130 * 131 * @param customChain1 132 */ 133 public void setCustomChain1(String customChain1) { 134 this.customChain1 = customChain1; 135 } 136 137 138 public AtomCache getAtomCache() { 139 return cache; 140 } 141 142 public void setAtomCache(AtomCache cache) { 143 this.cache = cache; 144 } 145 146 147 148 public StructureAlignment getAlgorithm() { 149 return algorithm; 150 } 151 152 public void setAlgorithm(StructureAlignment algo) { 153 this.algorithm = algo; 154 } 155 156 157 public String getOutFile() { 158 return outFile; 159 } 160 161 162 public void setOutFile(String outFile) { 163 this.outFile = outFile; 164 } 165 166 167 public static String getLegend(String algorithmName){ 168 169 if ( algorithmName.equalsIgnoreCase(CeMain.algorithmName) || 170 algorithmName.equalsIgnoreCase(CeSideChainMain.algorithmName) || 171 algorithmName.equalsIgnoreCase(CeCPMain.algorithmName)) { 172 return "# name1\tname2\tscore\tz-score\trmsd\tlen1\tlen2\tcov1\tcov2\t%ID\tDescription\t " ; 173 } 174 175 // looks like a FATCAT alignment 176 177 return "# name1\tname2\tscore\tprobability\trmsd\tlen1\tlen2\tcov1\tcov2\t%ID\tDescription\t " ; 178 179 } 180 181 182 183 public File getResultFile() { 184 return resultList; 185 } 186 187 188 public void setResultFile(File resultList) { 189 this.resultList = resultList; 190 } 191 192 193 public void run(){ 194 195 File outFileF = null; 196 SynchronizedOutFile out ; 197 198 try { 199 checkLocalFiles(); 200 201 if ( interrupted.get()) 202 return; 203 204 String header = "# algorithm:" + algorithm.getAlgorithmName(); 205 String legend = getLegend(algorithm.getAlgorithmName()); 206 207 208 209 outFileF = new File(outFile); 210 if ( ! outFileF.isDirectory()){ 211 logger.error("{} is not a directory, can't create result files in there...", outFileF.getAbsolutePath()); 212 interrupt(); 213 cleanup(); 214 } 215 216 if ( name1 == null) 217 name1 = "CUSTOM"; 218 219 220 resultList = new File(outFileF,"results_" + name1 + ".out"); 221 222 logger.info("writing results to {}", resultList.getAbsolutePath()); 223 224 225 226 out = new SynchronizedOutFile(resultList); 227 228 out.write(header); 229 out.write(AFPChain.newline); 230 out.write(legend); 231 out.write(AFPChain.newline); 232 233 if ( name1.equals("CUSTOM")) { 234 235 String config1 = "#param:file1=" + customFile1; 236 out.write(config1); 237 out.write(AFPChain.newline); 238 239 if ( customChain1 != null) { 240 String config2 = "#param:chain1=" + customChain1; 241 out.write(config2); 242 out.write(AFPChain.newline); 243 } 244 245 } 246 247 if ( algorithm.getAlgorithmName().startsWith("jCE")){ 248 ConfigStrucAligParams params = algorithm.getParameters(); 249 if ( params instanceof CeParameters){ 250 CeParameters ceParams = (CeParameters) params; 251 if ( ceParams.getScoringStrategy() != CeParameters.ScoringStrategy.DEFAULT_SCORING_STRATEGY) { 252 String scoring = "#param:scoring=" + ceParams.getScoringStrategy(); 253 out.write(scoring); 254 out.write(AFPChain.newline); 255 } 256 } 257 } 258 259 out.flush(); 260 } catch (IOException e){ 261 logger.error("Error while loading representative structure {}", name1, e); 262 interrupt(); 263 cleanup(); 264 return; 265 } catch (StructureException e) { 266 logger.error("Error while loading representative structure {}", name1, e); 267 interrupt(); 268 cleanup(); 269 return; 270 } 271 272 273 int nrJobs = 0; 274 DomainProvider domainProvider; 275 try { 276 domainProvider = DomainProviderFactory.getDomainProvider(); 277 278 ConcurrencyTools.setThreadPoolSize(nrCPUs); 279 280 Atom[] ca1 = StructureTools.getRepresentativeAtomArray(structure1); 281 282 for (String repre : representatives){ 283 284 if( domainSplit ) { 285 SortedSet<String> domainNames = domainProvider.getDomainNames(repre); 286 //logger.debug(repre +" got domains: " +domainNames); 287 if( domainNames == null || domainNames.size()==0){ 288 // no domains found, use whole chain. 289 submit(name1, repre, ca1, algorithm, outFileF, out, cache); 290 nrJobs++; 291 continue; 292 } 293 //logger.debug("got " + domainNames.size() + " for " + repre); 294 for( String domain : domainNames){ 295 submit(name1, domain, ca1, algorithm, outFileF, out, cache); 296 nrJobs++; 297 } 298 } else { 299 submit(name1, repre, ca1, algorithm, outFileF, out, cache); 300 nrJobs++; 301 } 302 303 } 304 } catch(IOException e) { 305 logger.error("Error while fetching representative domains", e); 306 interrupt(); 307 cleanup(); 308 return; 309 } catch (StructureException e) { 310 logger.error("Error while fetching representative domains", e); 311 interrupt(); 312 cleanup(); 313 return; 314 } 315 316 317 ThreadPoolExecutor pool = ConcurrencyTools.getThreadPool(); 318 logger.info("{}", pool.getPoolSize()); 319 320 long startTime = System.currentTimeMillis(); 321 322 try { 323 while ( pool.getCompletedTaskCount() < nrJobs-1 ) { 324 //long now = System.currentTimeMillis(); 325 //logger.debug( pool.getCompletedTaskCount() + " " + (now-startTime)/1000 + " " + pool.getPoolSize() + " " + pool.getActiveCount() + " " + pool.getTaskCount() ); 326 // if ((now-startTime)/1000 > 60) { 327 // 328 // interrupt(); 329 // logger.debug("completed: " + pool.getCompletedTaskCount()); 330 // } 331 332 if ( interrupted.get()) 333 break; 334 335 Thread.sleep(2000); 336 337 } 338 out.close(); 339 } 340 catch (Exception e){ 341 logger.error("Exception: ", e); 342 interrupt(); 343 cleanup(); 344 } 345 346 if (domainProvider instanceof RemoteDomainProvider){ 347 RemoteDomainProvider remote = (RemoteDomainProvider) domainProvider; 348 remote.flushCache(); 349 } 350 long now = System.currentTimeMillis(); 351 logger.info("Calculation took : {} sec.", (now-startTime)/1000); 352 logger.info("{} {} {} {}", pool.getCompletedTaskCount(), pool.getPoolSize(), pool.getActiveCount(), pool.getTaskCount()); 353 } 354 355 356 357 private void checkLocalFiles() throws IOException, StructureException { 358 359 logger.info("Checking local PDB installation in directory: {}", cache.getPath()); 360 361 File f = new File(cache.getPath()); 362 if ( ! f.isDirectory()) { 363 logger.error("The path {} should point to a directory!", f.getAbsolutePath()); 364 } 365 366 if ( ! f.canWrite()) { 367 logger.error("You do not have permission to write to {}. There could be a problem if the PDB installation is not up-to-date with fetching missing PDB files.", f.getAbsolutePath()); 368 } 369 370 DomainProvider domainProvider = DomainProviderFactory.getDomainProvider(); 371 372 373 374 for (String repre : representatives){ 375 376 if ( interrupted.get()) 377 return; 378 379 if( domainSplit ) { 380 SortedSet<String> domainNames = domainProvider.getDomainNames(repre); 381 //logger.debug(repre +" got domains: " +domainNames); 382 if( domainNames == null || domainNames.size()==0){ 383 // no domains found, use whole chain. 384 //submit(name1, repre, ca1, algorithm, outFileF, out, cache); 385 checkFile(repre); 386 continue; 387 } 388 //logger.debug("got " + domainNames.size() + " for " + repre); 389 for( String domain : domainNames){ 390 //submit(name1, domain, ca1, algorithm, outFileF, out, cache); 391 checkFile(domain); 392 } 393 } else { 394 //submit(name1, repre, ca1, algorithm, outFileF, out, cache); 395 checkFile(repre); 396 } 397 398 } 399 400 if ( domainProvider instanceof RemoteDomainProvider ) { 401 RemoteDomainProvider remoteP = (RemoteDomainProvider) domainProvider; 402 remoteP.flushCache(); 403 } 404 405 logger.info("done checking local files..."); 406 407 } 408 409 410 private void checkFile(String repre) throws IOException, StructureException { 411 412 StructureName name = new StructureName(repre); 413 414 PDBFileReader reader = new PDBFileReader(); 415 reader.setFetchBehavior(FetchBehavior.FETCH_REMEDIATED); 416 reader.setPath(cache.getPath()); 417 reader.setFileParsingParameters(cache.getFileParsingParams()); 418 reader.prefetchStructure(name.getPdbId()); 419 } 420 421 422 private void submit(String name12, String repre, Atom[] ca1, StructureAlignment algorithm , File outFileF , SynchronizedOutFile out , AtomCache cache ) { 423 CallableStructureAlignment ali = new CallableStructureAlignment(); 424 425 PdbPair pair = new PdbPair(name1, repre); 426 try { 427 ali.setCa1(ca1); 428 } catch (Exception e){ 429 logger.error("Exception: ", e); 430 ConcurrencyTools.shutdown(); 431 return; 432 } 433 ali.setAlgorithmName(algorithm.getAlgorithmName()); 434 ali.setParameters(algorithm.getParameters()); 435 ali.setPair(pair); 436 ali.setOutFile(out); 437 ali.setOutputDir(outFileF); 438 ali.setCache(cache); 439 440 ConcurrencyTools.submit(ali); 441 442 } 443 444 445 /** stops what is currently happening and does not continue 446 * 447 * 448 */ 449 public void interrupt() { 450 interrupted.set(true); 451 ExecutorService pool = ConcurrencyTools.getThreadPool(); 452 pool.shutdownNow(); 453 try { 454 DomainProvider domainProvider = DomainProviderFactory.getDomainProvider(); 455 if (domainProvider instanceof RemoteDomainProvider){ 456 RemoteDomainProvider remote = (RemoteDomainProvider) domainProvider; 457 remote.flushCache(); 458 } 459 } catch (IOException e) { 460 // If errors occur, the cache should be empty anyways 461 } 462 463 } 464 465 public void cleanup() 466 { 467 468 structure1 = null; 469 470 471 } 472 473}