001/* 002 * BioJava development code 003 * 004 * This code may be freely distributed and modified under the 005 * terms of the GNU Lesser General Public Licence. This should 006 * be distributed with the code. If you do not have a copy, 007 * see: 008 * 009 * http://www.gnu.org/copyleft/lesser.html 010 * 011 * Copyright for this code is held jointly by the individual 012 * authors. These should be listed in @author doc comments. 013 * 014 * For more information on the BioJava project and its aims, 015 * or to join the biojava-l mailing list, visit the home page 016 * at: 017 * 018 * http://www.biojava.org/ 019 * 020 */ 021package org.biojava.nbio.structure.align.client; 022 023import org.biojava.nbio.structure.Atom; 024import org.biojava.nbio.structure.StructureException; 025import org.biojava.nbio.structure.align.StructureAlignment; 026import org.biojava.nbio.structure.align.StructureAlignmentFactory; 027import org.biojava.nbio.structure.align.ce.CeCPMain; 028import org.biojava.nbio.structure.align.ce.CeMain; 029import org.biojava.nbio.structure.align.events.AlignmentProgressListener; 030import org.biojava.nbio.structure.align.fatcat.FatCatFlexible; 031import org.biojava.nbio.structure.align.fatcat.FatCatRigid; 032import org.biojava.nbio.structure.align.model.AFPChain; 033import org.biojava.nbio.structure.align.util.AFPChainScorer; 034import org.biojava.nbio.structure.align.util.AlignmentTools; 035import org.biojava.nbio.structure.align.util.AtomCache; 036import org.biojava.nbio.structure.align.util.ResourceManager; 037import org.biojava.nbio.structure.align.xml.AFPChainXMLConverter; 038import org.biojava.nbio.structure.align.xml.PdbPairsMessage; 039import org.biojava.nbio.structure.domain.RemotePDPProvider; 040import org.biojava.nbio.structure.io.LocalPDBDirectory.FetchBehavior; 041import org.biojava.nbio.structure.scop.RemoteScopInstallation; 042import org.biojava.nbio.structure.scop.ScopFactory; 043import org.biojava.nbio.core.util.FlatFileCache; 044import org.biojava.nbio.core.util.PrettyXMLWriter; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import java.io.IOException; 049import java.io.PrintWriter; 050import java.io.StringWriter; 051import java.net.InetAddress; 052import java.net.UnknownHostException; 053import java.util.*; 054 055 056 057 058/** Contains the single thread for a job that can run multiple alignments. 059 * 060 * @author Andreas Prlic 061 * 062 */ 063public class FarmJobRunnable implements Runnable { 064 065 private static final Logger logger = LoggerFactory.getLogger(FarmJobRunnable.class); 066 067 068 //private static final int DEFAULT_PAIR_FETCH_DELAY = 30000; 069 //private static final String CONNECTION_PAIR_DELAY = "connection.pair.delay"; 070 private static final String JFATCAT_NAME = "jfatcat.name"; 071 private static final String JFATCAT_VERSION = "jfatcat.version"; 072 073 private static ResourceManager resourceManager = ResourceManager.getResourceManager("jfatcat"); 074 075 076 //private static DateFormat dateFormat = new SimpleDateFormat("MMMM dd, yyyy h:mm a",Locale.US); 077 078 FarmJobParameters params; 079 080 String prevName1; 081 Atom[] ca1 ; 082 083 084 long startTime; 085 long maxTime; 086 int maxNrAlignments; 087 int alignmentsCalculated; 088 089 boolean waitForAlignments; 090 091 private static final String randomUsername = getRandomUsername(); 092 093 boolean terminated ; 094 095 List<AlignmentProgressListener> progressListeners; 096 CountProgressListener counter ; 097 098 String userName = null; 099 protected AtomCache cache; 100 101 boolean verbose = false; // TODO dmyersturnbull: we should probably remove this in favor of SLF4J 102 String version = null; 103 104 private static final String alignURL = "/align/"; 105 public FarmJobRunnable(FarmJobParameters params){ 106 terminated = false; 107 this.params = params; 108 verbose = false; 109 110 // multiple farm jobs share the same SoftHashMap for caching coordinates 111 cache = new AtomCache( params.getPdbFilePath(), params.getCacheFilePath()); 112 113 114 if ( params.getServer()!= null && (!params.getServer().equals("") ) ) { 115 116 RemotePDPProvider pdpprovider = new RemotePDPProvider(); 117 118 String serverURL = params.getServer(); 119 if ( ! serverURL.endsWith("/")) 120 serverURL += "/"; 121 122 if ( serverURL.endsWith(alignURL)) { 123 serverURL = serverURL.substring(0,serverURL.length()-alignURL.length()); 124 } 125 126 pdpprovider.setServer(serverURL+"/domains/"); 127 128 cache.setPdpprovider(pdpprovider); 129 130 RemoteScopInstallation scop = new RemoteScopInstallation(); 131 132 scop.setServer(serverURL+"/domains/"); 133 ScopFactory.setScopDatabase(scop); 134 135 } 136 137 cache.setFetchBehavior(FetchBehavior.FETCH_FILES); 138 139 maxNrAlignments = params.getNrAlignments(); 140 progressListeners = null; 141 if (params.getUsername() == null) { 142 userName = randomUsername; 143 } else { 144 userName = params.getUsername(); 145 } 146 counter = new CountProgressListener(); 147 addAlignmentProgressListener(counter); 148 waitForAlignments = true; 149 150 if ( params.isVerbose()){ 151 verbose = true; 152 } 153 } 154 155 public void addAlignmentProgressListener(AlignmentProgressListener listener){ 156 157 if (progressListeners == null) 158 progressListeners = new ArrayList<AlignmentProgressListener>(); 159 160 progressListeners.add(listener); 161 } 162 163 public void clearListeners(){ 164 if ( progressListeners == null) 165 return; 166 progressListeners.clear(); 167 progressListeners = null; 168 } 169 170 protected static String getRandomUsername(){ 171 String name = ""; 172 try { 173 InetAddress i = InetAddress.getLocalHost(); 174 name += i.getHostAddress(); 175 name += "_"; 176 } catch (UnknownHostException e){ 177 throw new RuntimeException(e); 178 } 179 name += UUID.randomUUID(); 180 181 return name; 182 183 } 184 185 @Override 186 public void run() { 187 188 // Retrieve resource 189 String appVersion = resourceManager.getString(JFATCAT_VERSION); 190 String appName = resourceManager.getString(JFATCAT_NAME); 191 logger.info("{} version: {}", appName, appVersion); 192 193 194 startTime = System.currentTimeMillis(); 195 // -t ime is in seconds. 196 long maxSec = params.getTime(); 197 198 if ( maxSec < 5 ) 199 maxTime = Long.MAX_VALUE; 200 else 201 maxTime = startTime + params.getTime() * 1000; 202 203 terminated = false; 204 205 alignmentsCalculated = 0; 206 207 maxNrAlignments = params.getNrAlignments(); 208 209 if ( maxNrAlignments < 0 ){ 210 maxNrAlignments = Integer.MAX_VALUE; 211 } 212 213 logger.info("running job for max {} alignments", maxNrAlignments); 214 215 216 while (! terminated){ 217 218 // talk to server 219 // get list of alignments to run 220 // if maxNrAlignments > 100 we split up the calculations in chunks of 100. 221 // otherwise we request all of them at once. 222 // we request 223 PdbPairsMessage msg = getAlignmentPairsFromServer(); 224 if ( msg == null) { 225 logger.error("Got null instead of alignment pairs from server."); 226 randomSleep(); 227 continue; 228 } 229 SortedSet<PdbPair> alignmentPairs = msg.getPairs(); 230 logger.debug("{}: Server responded with {} pairs.", userName, alignmentPairs.size()); 231 List<String> results = new ArrayList<String>(); 232 233 String algorithmName = msg.getMethod(); 234 if ( version == null) { 235 setVersion(algorithmName); 236 237 } 238 for(PdbPair pair : alignmentPairs){ 239 240 if ( terminated) 241 break; 242 243 long now = System.currentTimeMillis(); 244 if ( now >= maxTime) { 245 terminated = true; 246 break; 247 } 248 249 if ( alignmentsCalculated >= maxNrAlignments) { 250 terminated = true; 251 break; 252 } 253 254 255 String name1 = pair.getName1(); 256 String name2 = pair.getName2(); 257 258 if ( progressListeners != null) 259 notifyStartAlignment(name1,name2); 260 261 262 try { 263 String resultXML = alignPair(name1, name2,algorithmName); 264 265 if ( progressListeners != null) 266 notifyEndAlignment(); 267 268 results.add(resultXML); 269 270 } catch (Exception e){ 271 logger.error("Problem aligning {} with {}", name1, name2, e); 272 273 StringWriter sw = new StringWriter(); 274 275 PrettyXMLWriter xml = new PrettyXMLWriter(new PrintWriter(sw)); 276 try { 277 xml.openTag("AFPChain"); 278 279 xml.attribute("name1", name1); 280 xml.attribute("name2", name2); 281 xml.attribute("error", e.getMessage()); 282 xml.closeTag("AFPChain"); 283 } catch(IOException ex){ 284 logger.error("Error occured converting alignment for {} and {} to XML", name1, name2, ex); 285 } 286 287 if ( progressListeners != null) 288 notifyEndAlignment(); 289 290 results.add(sw.toString()); 291 292 293 } 294 295 alignmentsCalculated++; 296 297 } 298 299 // send results back to server 300 sendResultsToServer(results); 301 302 long end = System.currentTimeMillis(); 303 if ( end >= maxTime) { 304 logger.info("OK end of job: reached maxTime {}", maxTime); 305 terminated = true; 306 307 } 308 309 if ( alignmentsCalculated >= maxNrAlignments) { 310 logger.info("OK end of job: reached maxNrAlignments", maxNrAlignments); 311 terminated = true; 312 313 } 314 315 long tdiff = (end - startTime); 316 if ( tdiff != 0) { 317 318 logger.info(userName + String.format(": job has run for : %.2f", (tdiff) / 1000.0 / 60) + " min."); 319 logger.info("{}: total nr of alignments calculated: {}", userName, alignmentsCalculated); 320 if ( alignmentsCalculated > 0) 321 logger.info(userName + String.format(": average time / alignment: %.2f", (tdiff / alignmentsCalculated / 1000.0)) + " sec."); 322 } 323 } 324 325 logger.info(userName + ": jFATCAT job result: " + counter); 326 327 // clean up in the end... 328 clearListeners(); 329 330 cache.notifyShutdown(); 331 332 } 333 334 335 private void setVersion(String algorithmName) { 336 StructureAlignment algorithm; 337 try { 338 algorithm = StructureAlignmentFactory.getAlgorithm(algorithmName); 339 version = algorithm.getVersion(); 340 } catch (StructureException e) { 341 throw new RuntimeException("Couldn't set version for algorithm \"" + algorithmName + "\"", e); 342// version = resourceManager.getString(JFATCAT_VERSION); // dmyersturnbull: was this 343 } 344 345 346 } 347 348 private void notifyStartAlignment(String name1, String name2) { 349 if ( progressListeners != null){ 350 for (AlignmentProgressListener li : progressListeners){ 351 li.alignmentStarted(name1, name2); 352 } 353 } 354 } 355 356 private void notifyEndAlignment(){ 357 if ( progressListeners != null){ 358 for (AlignmentProgressListener li : progressListeners){ 359 li.alignmentEnded(); 360 361 } 362 } 363 } 364 365 private void notifyRequestingAlignments(int nrAlignments){ 366 if ( progressListeners != null){ 367 for (AlignmentProgressListener li : progressListeners){ 368 li.requestingAlignmentsFromServer(nrAlignments); 369 370 } 371 } 372 } 373 374 private void notifySubmittingAlignments(int nrAlignments, String message){ 375 if ( progressListeners != null){ 376 for (AlignmentProgressListener li : progressListeners){ 377 li.sentResultsToServer(nrAlignments,message); 378 379 } 380 } 381 } 382 383 384 public String alignPair(String name1, String name2) 385 throws StructureException, IOException { 386 return alignPair(name1, name2, FatCatRigid.algorithmName); 387 } 388 389 public String alignPair(String name1, String name2, String algorithmName) 390 throws StructureException, IOException { 391 392 // make sure each thread has an independent instance of the algorithm object ... 393 394 StructureAlignment algorithm = getAlgorithm(algorithmName); 395 396 // we are running with default parameters 397 398 if ( verbose ) { 399 logger.debug("aligning {} against {}", name1, name2); 400 } 401 402 long startTime = System.currentTimeMillis(); 403 404 if ( prevName1 == null) 405 initMaster(name1); 406 407 if ( ! prevName1.equals(name1) ) { 408 // we need to reload the master 409 initMaster(name1); 410 } 411 412 // get a copy of the atoms, but clone them, since they will be rotated... 413 Atom[] ca2 = cache.getAtoms(name2); 414 415 AFPChain afpChain = algorithm.align(ca1, ca2); 416 417 afpChain.setName1(name1); 418 afpChain.setName2(name2); 419 420 try { 421 // add tmScore 422 double tmScore = AFPChainScorer.getTMScore(afpChain, ca1, ca2); 423 afpChain.setTMScore(tmScore); 424 } catch (RuntimeException e){ 425 logger.error("ca1 size: {} ca2 length: {} {} {}", ca1.length, ca2.length, afpChain.getName1(), afpChain.getName2(), e); 426 427 } 428 long endTime = System.currentTimeMillis(); 429 430 long calcTime = (endTime-startTime); 431 if ( verbose ){ 432 boolean isCP = !AlignmentTools.isSequentialAlignment(afpChain, false); 433 String msg = "finished alignment: " + name1 + " vs. " + name2 + " in " + (calcTime) / 1000.0 + " sec."; 434 msg += " algo: " + algorithmName + " v:" + version + " " + afpChain; 435 436 if ( isCP ) msg += "HAS A CIRCULAR PERMUTATION!!!"; 437 logger.debug(msg); 438 } 439 if (verbose){ 440 printMemory(); 441 } 442 afpChain.setCalculationTime(calcTime); 443 444 return AFPChainXMLConverter.toXML(afpChain, ca1, ca2); 445 } 446 447 448 449 450 private void printMemory() { 451 int size = 1048576; 452 long heapSize = Runtime.getRuntime().totalMemory() / size; 453 454 // Get maximum size of heap in bytes. The heap cannot grow beyond this size. 455 // Any attempt will result in an OutOfMemoryException. 456 long heapMaxSize = Runtime.getRuntime().maxMemory() / size; 457 458 // Get amount of free memory within the heap in bytes. This size will increase 459 // after garbage collection and decrease as new objects are created. 460 long heapFreeSize = Runtime.getRuntime().freeMemory() / size; 461 StringBuilder msg = new StringBuilder(); 462 msg.append(" total: ").append(heapSize).append(" M"); 463 msg.append(" max: "). append(heapMaxSize).append(" M"); 464 msg.append(" free: ").append(heapFreeSize).append(" M"); 465 466 logger.debug(msg.toString()); 467 468 } 469 470 private StructureAlignment getAlgorithm(String algorithmName) throws StructureException { 471 472 473 StructureAlignment algorithm = null; 474 475 if ( algorithmName == null){ 476 477 algorithm = new FatCatRigid(); 478 479 } else if ( algorithmName.equalsIgnoreCase(FatCatRigid.algorithmName)){ 480 481 algorithm = new FatCatRigid(); 482 483 } else if ( algorithmName.equalsIgnoreCase(CeMain.algorithmName)){ 484 485 algorithm = new CeMain(); 486 487 } else if ( algorithmName.equalsIgnoreCase(CeCPMain.algorithmName)){ 488 489 algorithm = new CeCPMain(); 490 491 } else if ( algorithmName.equalsIgnoreCase(FatCatFlexible.algorithmName)){ 492 493 algorithm = new FatCatFlexible(); 494 495 } else { 496 497 algorithm = StructureAlignmentFactory.getAlgorithm(algorithmName); 498 499 } 500 501 if ( algorithm == null) { 502 503 algorithm = new FatCatRigid(); 504 505 } 506 507 508 return algorithm; 509 } 510 511 private void initMaster(String name1) throws IOException, StructureException{ 512 513 ca1 = cache.getAtoms(name1); 514 515 prevName1 = name1; 516 517 } 518 519 520 /** talk to centralized server and fetch all alignments to run. 521 * 522 * @return a list of pairs to align. 523 */ 524 protected PdbPairsMessage getAlignmentPairsFromServer() { 525 526 527 String url = params.getServer(); 528 529 int nrPairs = params.getStepSize(); 530 531 if ( maxNrAlignments < nrPairs ) 532 nrPairs = maxNrAlignments; 533 534 SortedSet<PdbPair> allPairs = new TreeSet<PdbPair>(); 535 536 PdbPairsMessage msg = null; 537 538 539 try { 540 541 if ( progressListeners != null) 542 notifyRequestingAlignments(nrPairs); 543 544 545 546 if ( ! waitForAlignments) { 547 msg = JFatCatClient.getPdbPairs(url, nrPairs, userName); 548 allPairs = msg.getPairs(); 549 550 } else { 551 552 while (allPairs.isEmpty()) { 553 msg = JFatCatClient.getPdbPairs(url, nrPairs, userName); 554 allPairs = msg.getPairs(); 555 556 if (allPairs.isEmpty()) { 557 randomSleep(); 558 } 559 } 560 } 561 } catch ( JobKillException k ){ 562 563 logger.debug("Terminating job", k); 564 terminate(); 565 566 } catch (Exception e) { 567 logger.error("Error while requesting alignment pairs", e); 568 // an error has occured sleep 30 sec. 569 570 randomSleep(); 571 572 573 } 574 575 return msg; 576 } 577 578 private void randomSleep() { 579 try { 580 581 int delay = JFatCatClient.getRandomSleepTime(); 582 logger.debug("sleeping {} sec.", delay/1000); 583 Thread.sleep(delay); 584 } catch (InterruptedException ex){ 585 logger.trace("InterruptedException occurred while sleeping", ex); 586 } 587 588 } 589 590 protected void sendResultsToServer(List<String> results) { 591 592 String serverLocation = params.getServer(); 593 594 if ( results.size() < 1) 595 return; 596 597 String fullXml = "<alignments>"; 598 599 for (String xml: results){ 600 fullXml +=xml; 601 } 602 fullXml += "</alignments>"; 603 String msg = ""; 604 try { 605 msg = JFatCatClient.sendMultiAFPChainToServer(serverLocation,fullXml, userName, version ); 606 } catch (JobKillException e){ 607 logger.info("{} Got Job Kill message from server, terminating...", userName, e); 608 terminate(); 609 } 610 611 if ( progressListeners != null) 612 notifySubmittingAlignments(results.size(), msg); 613 logger.info("{}: Sent {} results to server. job status: {}", userName, results.size(), counter); 614 logger.info("{}: fileCache size: {}", userName, FlatFileCache.size()); 615 } 616 617 618 /** Send signal to terminate calculations 619 * 620 */ 621 public synchronized void terminate(){ 622 terminated = true; 623 } 624 625 public boolean isWaitForAlignments() { 626 return waitForAlignments; 627 } 628 629 public void setWaitForAlignments(boolean waitForAlignments) { 630 this.waitForAlignments = waitForAlignments; 631 } 632 633 634 635}