001/*
002 *                    BioJava development code
003 *
004 * This code may be freely distributed and modified under the
005 * terms of the GNU Lesser General Public Licence.  This should
006 * be distributed with the code.  If you do not have a copy,
007 * see:
008 *
009 *      http://www.gnu.org/copyleft/lesser.html
010 *
011 * Copyright for this code is held jointly by the individual
012 * authors.  These should be listed in @author doc comments.
013 *
014 * For more information on the BioJava project and its aims,
015 * or to join the biojava-l mailing list, visit the home page
016 * at:
017 *
018 *      http://www.biojava.org/
019 *
020 */
021package org.biojava.nbio.structure.align.client;
022
023import org.biojava.nbio.structure.Atom;
024import org.biojava.nbio.structure.StructureException;
025import org.biojava.nbio.structure.align.StructureAlignment;
026import org.biojava.nbio.structure.align.StructureAlignmentFactory;
027import org.biojava.nbio.structure.align.ce.CeCPMain;
028import org.biojava.nbio.structure.align.ce.CeMain;
029import org.biojava.nbio.structure.align.events.AlignmentProgressListener;
030import org.biojava.nbio.structure.align.fatcat.FatCatFlexible;
031import org.biojava.nbio.structure.align.fatcat.FatCatRigid;
032import org.biojava.nbio.structure.align.model.AFPChain;
033import org.biojava.nbio.structure.align.util.AFPChainScorer;
034import org.biojava.nbio.structure.align.util.AlignmentTools;
035import org.biojava.nbio.structure.align.util.AtomCache;
036import org.biojava.nbio.structure.align.util.ResourceManager;
037import org.biojava.nbio.structure.align.xml.AFPChainXMLConverter;
038import org.biojava.nbio.structure.align.xml.PdbPairsMessage;
039import org.biojava.nbio.structure.domain.RemotePDPProvider;
040import org.biojava.nbio.structure.io.LocalPDBDirectory.FetchBehavior;
041import org.biojava.nbio.structure.scop.RemoteScopInstallation;
042import org.biojava.nbio.structure.scop.ScopFactory;
043import org.biojava.nbio.core.util.FlatFileCache;
044import org.biojava.nbio.core.util.PrettyXMLWriter;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import java.io.IOException;
049import java.io.PrintWriter;
050import java.io.StringWriter;
051import java.net.InetAddress;
052import java.net.UnknownHostException;
053import java.util.*;
054
055
056
057
058/** Contains the single thread for a job that can run multiple alignments.
059 *
060 * @author Andreas Prlic
061 *
062 */
063public class FarmJobRunnable implements Runnable {
064
065        private static final Logger logger = LoggerFactory.getLogger(FarmJobRunnable.class);
066
067
068        //private static final int DEFAULT_PAIR_FETCH_DELAY   = 30000;
069        //private static final String CONNECTION_PAIR_DELAY   = "connection.pair.delay";
070        private static final String JFATCAT_NAME            = "jfatcat.name";
071        private static final String JFATCAT_VERSION         = "jfatcat.version";
072
073        private static ResourceManager resourceManager = ResourceManager.getResourceManager("jfatcat");
074
075
076        //private static DateFormat dateFormat = new SimpleDateFormat("MMMM dd, yyyy h:mm a",Locale.US);
077
078        FarmJobParameters params;
079
080        String prevName1;
081        Atom[] ca1 ;
082
083
084        long startTime;
085        long maxTime;
086        int maxNrAlignments;
087        int alignmentsCalculated;
088
089        boolean waitForAlignments;
090
091        private static final String randomUsername = getRandomUsername();
092
093        boolean terminated ;
094
095        List<AlignmentProgressListener> progressListeners;
096        CountProgressListener counter ;
097
098        String userName = null;
099        protected AtomCache cache;
100
101        boolean verbose = false; // TODO dmyersturnbull: we should probably remove this in favor of SLF4J
102        String version = null;
103
104        private static final String alignURL = "/align/";
105        public FarmJobRunnable(FarmJobParameters params){
106                terminated = false;
107                this.params = params;
108                verbose = false;
109
110                // multiple farm jobs share the same SoftHashMap for caching coordinates
111                cache = new AtomCache( params.getPdbFilePath(), params.getCacheFilePath());
112
113
114                if ( params.getServer()!= null && (!params.getServer().equals("") ) ) {
115
116                        RemotePDPProvider pdpprovider = new RemotePDPProvider();
117
118                        String serverURL = params.getServer();
119                        if ( ! serverURL.endsWith("/"))
120                                serverURL += "/";
121
122                        if (  serverURL.endsWith(alignURL)) {
123                                serverURL = serverURL.substring(0,serverURL.length()-alignURL.length());
124                        }
125
126                        pdpprovider.setServer(serverURL+"/domains/");
127
128                        cache.setPdpprovider(pdpprovider);
129
130                        RemoteScopInstallation scop = new RemoteScopInstallation();
131
132                        scop.setServer(serverURL+"/domains/");
133                        ScopFactory.setScopDatabase(scop);
134
135                }
136
137                cache.setFetchBehavior(FetchBehavior.FETCH_FILES);
138
139                maxNrAlignments = params.getNrAlignments();
140                progressListeners = null;
141                if (params.getUsername() == null) {
142                        userName = randomUsername;
143                } else {
144                        userName = params.getUsername();
145                }
146                counter  = new CountProgressListener();
147                addAlignmentProgressListener(counter);
148                waitForAlignments = true;
149
150                if ( params.isVerbose()){
151                        verbose = true;
152                }
153        }
154
155        public void addAlignmentProgressListener(AlignmentProgressListener listener){
156
157                if (progressListeners == null)
158                        progressListeners = new ArrayList<AlignmentProgressListener>();
159
160                progressListeners.add(listener);
161        }
162
163        public void clearListeners(){
164                if ( progressListeners == null)
165                        return;
166                progressListeners.clear();
167                progressListeners = null;
168        }
169
170        protected static String getRandomUsername(){
171                String name = "";
172                try {
173                        InetAddress i = InetAddress.getLocalHost();
174                        name += i.getHostAddress();
175                        name += "_";
176                } catch (UnknownHostException e){
177                        throw new RuntimeException(e);
178                }
179                name +=  UUID.randomUUID();
180
181                return name;
182
183        }
184
185        @Override
186        public void run() {
187
188                // Retrieve resource
189                String appVersion = resourceManager.getString(JFATCAT_VERSION);
190                String appName    = resourceManager.getString(JFATCAT_NAME);
191                logger.info("{} version: {}", appName, appVersion);
192
193
194                startTime = System.currentTimeMillis();
195                // -t ime is in seconds.
196                long maxSec = params.getTime();
197
198                if ( maxSec < 5 )
199                        maxTime = Long.MAX_VALUE;
200                else
201                        maxTime = startTime + params.getTime() * 1000;
202
203                terminated = false;
204
205                alignmentsCalculated = 0;
206
207                maxNrAlignments = params.getNrAlignments();
208
209                if ( maxNrAlignments < 0 ){
210                        maxNrAlignments = Integer.MAX_VALUE;
211                }
212
213                logger.info("running job for max {} alignments", maxNrAlignments);
214
215
216                while (! terminated){
217
218                        // talk to server
219                        // get list of alignments to run
220                        // if maxNrAlignments > 100 we split up the calculations in chunks of 100.
221                        // otherwise we request all of them at once.
222                        // we request
223                        PdbPairsMessage msg = getAlignmentPairsFromServer();
224                        if ( msg == null) {
225                                logger.error("Got null instead of alignment pairs from server.");
226                                randomSleep();
227                                continue;
228                        }
229                        SortedSet<PdbPair> alignmentPairs = msg.getPairs();
230                        logger.debug("{}: Server responded with {} pairs.", userName, alignmentPairs.size());
231                        List<String> results = new ArrayList<String>();
232
233                        String algorithmName = msg.getMethod();
234                        if ( version == null) {
235                                setVersion(algorithmName);
236
237                        }
238                        for(PdbPair pair : alignmentPairs){
239
240                                if ( terminated)
241                                        break;
242
243                                long now = System.currentTimeMillis();
244                                if ( now >= maxTime)  {
245                                        terminated = true;
246                                        break;
247                                }
248
249                                if ( alignmentsCalculated >= maxNrAlignments) {
250                                        terminated = true;
251                                        break;
252                                }
253
254
255                                String name1 = pair.getName1();
256                                String name2 = pair.getName2();
257
258                                if ( progressListeners != null)
259                                        notifyStartAlignment(name1,name2);
260
261
262                                try {
263                                        String resultXML = alignPair(name1, name2,algorithmName);
264
265                                        if ( progressListeners != null)
266                                                notifyEndAlignment();
267
268                                        results.add(resultXML);
269
270                                } catch (Exception e){
271                                        logger.error("Problem aligning {} with {}", name1, name2, e);
272
273                                        StringWriter sw = new StringWriter();
274
275                                        PrettyXMLWriter xml = new PrettyXMLWriter(new PrintWriter(sw));
276                                        try {
277                                                xml.openTag("AFPChain");
278
279                                                xml.attribute("name1", name1);
280                                                xml.attribute("name2", name2);
281                                                xml.attribute("error", e.getMessage());
282                                                xml.closeTag("AFPChain");
283                                        } catch(IOException ex){
284                                                logger.error("Error occured converting alignment for {} and {} to XML", name1, name2, ex);
285                                        }
286
287                                        if ( progressListeners != null)
288                                                notifyEndAlignment();
289
290                                        results.add(sw.toString());
291
292
293                                }
294
295                                alignmentsCalculated++;
296
297                        }
298
299                        // send results back to server
300                        sendResultsToServer(results);
301
302                        long end = System.currentTimeMillis();
303                        if ( end >= maxTime)  {
304                                logger.info("OK end of job: reached maxTime {}", maxTime);
305                                terminated = true;
306
307                        }
308
309                        if ( alignmentsCalculated >= maxNrAlignments) {
310                                logger.info("OK end of job: reached maxNrAlignments", maxNrAlignments);
311                                terminated = true;
312
313                        }
314
315                        long tdiff = (end - startTime);
316                        if ( tdiff != 0) {
317
318                                logger.info(userName + String.format(": job has run for :  %.2f", (tdiff) / 1000.0 / 60) + " min.");
319                                logger.info("{}: total nr of alignments calculated: {}", userName, alignmentsCalculated);
320                                if ( alignmentsCalculated > 0)
321                                        logger.info(userName + String.format(": average time / alignment: %.2f", (tdiff / alignmentsCalculated / 1000.0)) + " sec.");
322                        }
323                }
324
325                logger.info(userName + ": jFATCAT job result: " + counter);
326
327                // clean up in the end...
328                clearListeners();
329
330                cache.notifyShutdown();
331
332        }
333
334
335        private void setVersion(String algorithmName) {
336                StructureAlignment algorithm;
337                try {
338                        algorithm = StructureAlignmentFactory.getAlgorithm(algorithmName);
339                        version = algorithm.getVersion();
340                } catch (StructureException e) {
341                        throw new RuntimeException("Couldn't set version for algorithm \"" + algorithmName + "\"", e);
342//                      version = resourceManager.getString(JFATCAT_VERSION); // dmyersturnbull: was this
343                }
344
345
346        }
347
348        private void notifyStartAlignment(String name1, String name2) {
349                if ( progressListeners != null){
350                        for (AlignmentProgressListener li : progressListeners){
351                                li.alignmentStarted(name1, name2);
352                        }
353                }
354        }
355
356        private void notifyEndAlignment(){
357                if ( progressListeners != null){
358                        for (AlignmentProgressListener li : progressListeners){
359                                li.alignmentEnded();
360
361                        }
362                }
363        }
364
365        private void notifyRequestingAlignments(int nrAlignments){
366                if ( progressListeners != null){
367                        for (AlignmentProgressListener li : progressListeners){
368                                li.requestingAlignmentsFromServer(nrAlignments);
369
370                        }
371                }
372        }
373
374        private void notifySubmittingAlignments(int nrAlignments, String message){
375                if ( progressListeners != null){
376                        for (AlignmentProgressListener li : progressListeners){
377                                li.sentResultsToServer(nrAlignments,message);
378
379                        }
380                }
381        }
382
383
384        public String alignPair(String name1, String name2)
385                throws StructureException, IOException {
386                return alignPair(name1, name2, FatCatRigid.algorithmName);
387        }
388
389        public String alignPair(String name1, String name2, String algorithmName)
390                throws StructureException, IOException {
391
392                //      make sure each thread has an independent instance of the algorithm object ...
393
394                StructureAlignment algorithm = getAlgorithm(algorithmName);
395
396                // we are running with default parameters
397
398                if ( verbose ) {
399                        logger.debug("aligning {} against {}", name1, name2);
400                }
401
402                long startTime = System.currentTimeMillis();
403
404                if ( prevName1 == null)
405                        initMaster(name1);
406
407                if ( ! prevName1.equals(name1) ) {
408                        // we need to reload the master
409                        initMaster(name1);
410                }
411
412                // get a copy of the atoms, but clone them, since they will be rotated...
413                Atom[] ca2 =  cache.getAtoms(name2);
414
415                AFPChain afpChain = algorithm.align(ca1, ca2);
416
417                afpChain.setName1(name1);
418                afpChain.setName2(name2);
419
420                try {
421                        // add tmScore
422                        double tmScore = AFPChainScorer.getTMScore(afpChain, ca1, ca2);
423                        afpChain.setTMScore(tmScore);
424                } catch (RuntimeException e){
425                        logger.error("ca1 size: {} ca2 length: {} {}  {}", ca1.length, ca2.length, afpChain.getName1(), afpChain.getName2(), e);
426
427                }
428                long endTime = System.currentTimeMillis();
429
430                long calcTime = (endTime-startTime);
431                if ( verbose ){
432                        boolean isCP = !AlignmentTools.isSequentialAlignment(afpChain, false);
433                        String msg = "finished alignment: " + name1 + " vs. " + name2 + " in " + (calcTime) / 1000.0 + " sec.";
434                        msg += " algo: " + algorithmName + " v:" + version + " " + afpChain;
435
436                        if ( isCP ) msg += "HAS A CIRCULAR PERMUTATION!!!";
437                        logger.debug(msg);
438                }
439                if (verbose){
440                        printMemory();
441                }
442                afpChain.setCalculationTime(calcTime);
443
444                return AFPChainXMLConverter.toXML(afpChain, ca1, ca2);
445        }
446
447
448
449
450        private void printMemory() {
451                int size = 1048576;
452                long heapSize = Runtime.getRuntime().totalMemory() / size;
453
454                // Get maximum size of heap in bytes. The heap cannot grow beyond this size.
455                // Any attempt will result in an OutOfMemoryException.
456                long heapMaxSize = Runtime.getRuntime().maxMemory() / size;
457
458                // Get amount of free memory within the heap in bytes. This size will increase
459                // after garbage collection and decrease as new objects are created.
460                long heapFreeSize = Runtime.getRuntime().freeMemory() / size;
461                StringBuilder msg = new StringBuilder();
462                msg.append("  total: ").append(heapSize).append(" M");
463                msg.append(" max: "). append(heapMaxSize).append(" M");
464                msg.append(" free: ").append(heapFreeSize).append(" M");
465
466                logger.debug(msg.toString());
467
468        }
469
470        private StructureAlignment getAlgorithm(String algorithmName) throws StructureException {
471
472
473                StructureAlignment algorithm    = null;
474
475                if ( algorithmName == null){
476
477                        algorithm = new FatCatRigid();
478
479                } else if ( algorithmName.equalsIgnoreCase(FatCatRigid.algorithmName)){
480
481                                algorithm = new FatCatRigid();
482
483                } else if ( algorithmName.equalsIgnoreCase(CeMain.algorithmName)){
484
485                        algorithm = new CeMain();
486
487                } else if ( algorithmName.equalsIgnoreCase(CeCPMain.algorithmName)){
488
489                        algorithm = new CeCPMain();
490
491                } else if ( algorithmName.equalsIgnoreCase(FatCatFlexible.algorithmName)){
492
493                        algorithm = new FatCatFlexible();
494
495                } else {
496
497                        algorithm = StructureAlignmentFactory.getAlgorithm(algorithmName);
498
499                }
500
501                if ( algorithm == null) {
502
503                        algorithm = new FatCatRigid();
504
505                }
506
507
508                return algorithm;
509        }
510
511        private void initMaster(String name1) throws IOException, StructureException{
512
513                ca1 = cache.getAtoms(name1);
514
515                prevName1 = name1;
516
517        }
518
519
520        /** talk to centralized server and fetch all alignments to run.
521         *
522         * @return a list of pairs to align.
523         */
524        protected PdbPairsMessage getAlignmentPairsFromServer() {
525
526
527                String url = params.getServer();
528
529                int nrPairs = params.getStepSize();
530
531                if ( maxNrAlignments < nrPairs )
532                        nrPairs = maxNrAlignments;
533
534                SortedSet<PdbPair> allPairs = new TreeSet<PdbPair>();
535
536                PdbPairsMessage msg = null;
537
538
539                try {
540
541                        if ( progressListeners != null)
542                                notifyRequestingAlignments(nrPairs);
543
544
545
546                        if ( ! waitForAlignments) {
547                                msg = JFatCatClient.getPdbPairs(url, nrPairs, userName);
548                                allPairs = msg.getPairs();
549
550                        } else {
551
552                                while (allPairs.isEmpty()) {
553                                        msg = JFatCatClient.getPdbPairs(url, nrPairs, userName);
554                                        allPairs = msg.getPairs();
555
556                                        if (allPairs.isEmpty()) {
557                                                randomSleep();
558                                        }
559                                }
560                        }
561                } catch ( JobKillException k ){
562
563                        logger.debug("Terminating job", k);
564                        terminate();
565
566                } catch (Exception e) {
567                        logger.error("Error while requesting alignment pairs", e);
568                        // an error has occured sleep 30 sec.
569
570                        randomSleep();
571
572
573                }
574
575                return msg;
576        }
577
578        private void randomSleep() {
579                try {
580
581                        int delay = JFatCatClient.getRandomSleepTime();
582                        logger.debug("sleeping {} sec.", delay/1000);
583                        Thread.sleep(delay);
584                } catch (InterruptedException ex){
585                        logger.trace("InterruptedException occurred while sleeping", ex);
586                }
587
588        }
589
590        protected void sendResultsToServer(List<String> results) {
591
592                String serverLocation = params.getServer();
593
594                if ( results.size() < 1)
595                        return;
596
597                String fullXml = "<alignments>";
598
599                for (String xml: results){
600                        fullXml +=xml;
601                }
602                fullXml += "</alignments>";
603                String msg = "";
604                try {
605                        msg = JFatCatClient.sendMultiAFPChainToServer(serverLocation,fullXml, userName, version );
606                } catch (JobKillException e){
607                        logger.info("{} Got Job Kill message from server, terminating...", userName, e);
608                        terminate();
609                }
610
611                if ( progressListeners != null)
612                        notifySubmittingAlignments(results.size(), msg);
613                logger.info("{}: Sent {} results to server. job status: {}", userName, results.size(), counter);
614                logger.info("{}: fileCache size: {}", userName, FlatFileCache.getInstance().size());
615        }
616
617
618        /** Send signal to terminate calculations
619         *
620         */
621        public synchronized void terminate(){
622                terminated = true;
623        }
624
625        public boolean isWaitForAlignments() {
626                return waitForAlignments;
627        }
628
629        public void setWaitForAlignments(boolean waitForAlignments) {
630                this.waitForAlignments = waitForAlignments;
631        }
632
633
634
635}