001package org.biojava.nbio.core.sequence.io;
002
003import org.biojava.nbio.core.sequence.ProteinSequence;
004import org.biojava.nbio.core.sequence.compound.AminoAcidCompound;
005import org.biojava.nbio.core.sequence.compound.AminoAcidCompoundSet;
006import org.biojava.nbio.core.sequence.io.template.SequenceCreatorInterface;
007import org.biojava.nbio.core.sequence.io.template.SequenceHeaderParserInterface;
008import org.biojava.nbio.core.util.InputStreamProvider;
009
010import java.io.File;
011import java.io.IOException;
012import java.io.InputStream;
013import java.io.UncheckedIOException;
014import java.nio.file.Path;
015import java.util.Collections;
016import java.util.Iterator;
017import java.util.LinkedHashMap;
018import java.util.Map;
019import java.util.Optional;
020import java.util.Spliterator;
021import java.util.Spliterators;
022import java.util.function.Consumer;
023import java.util.stream.Stream;
024import java.util.stream.StreamSupport;
025
026/**
027 * Read from a FASTA file (or gzipped FASTA file) and create a Java stream of {@link ProteinSequence} objects
028 * for use in a functional programming paradigm.
029 *
030 * @author Gary Murphy
031 * @since 7.1.0
032 */
033public class FastaStreamer {
034
035        private final Path path;
036        private int batchSize = 1_000;
037        private SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> headerParser;
038        private SequenceCreatorInterface<AminoAcidCompound> sequenceCreator;
039        private Map<String, ProteinSequence> chunk = new LinkedHashMap<>();
040        private Iterator<Map.Entry<String, ProteinSequence>> iterator = Collections.emptyIterator();
041        private boolean closed = false;
042
043        /**
044         * The constructor is private.  Created via the <tt>from(...)</tt> static factory method
045         *
046         * @param path the path to the file containing the FASTA content (possibly GZipped)
047         */
048        private FastaStreamer(final Path path) {
049                this.path = path;
050        }
051
052        public static FastaStreamer from(final Path path) {
053                return new FastaStreamer(path);
054        }
055
056        public static FastaStreamer from(File file) {
057                return from(file.toPath());
058        }
059
060        public FastaStreamer withHeaderParser(SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> headerParser) {
061                this.headerParser = headerParser;
062                return this;
063        }
064
065        public FastaStreamer withSequenceCreator(SequenceCreatorInterface<AminoAcidCompound> sequenceCreator) {
066                this.sequenceCreator = sequenceCreator;
067                return this;
068        }
069
070        public FastaStreamer batchSize(int size) {
071                this.batchSize = size;
072                return this;
073        }
074
075        /**
076         * Enable iteration through the proteins in the file using syntax such as:
077         * <pre>
078         *     for(ProteinSequence sequence : FastaStreamer.from(path).each()) {
079         *         .
080         *         .
081         *         .
082         *     }
083         * </pre>
084         *
085         * @return an iterable suitable for an iteration loop
086         */
087        public Iterable<ProteinSequence> each() {
088                return () -> stream().iterator();
089        }
090
091        /**
092         * Create a stream of protein sequences from the contents of the path
093         * @return the stream
094         */
095        public Stream<ProteinSequence> stream() {
096                InputStreamProvider provider = new InputStreamProvider();
097                InputStream input;
098                try {
099                        input = provider.getInputStream(getPath().toFile());
100                } catch (IOException exception) {
101                        throw new UncheckedIOException(exception);
102                }
103                FastaReader<ProteinSequence, AminoAcidCompound> reader = new FastaReader<>(input, getHeaderParser(), getSequenceCreator());
104                Spliterator<ProteinSequence> source = new Spliterators.AbstractSpliterator<>(Integer.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL) {
105                        @Override
106                        public boolean tryAdvance(Consumer<? super ProteinSequence> action) {
107                                if (closed) {
108                                        return false;
109                                }
110                                ProteinSequence protein = next(reader);
111                                if (null == protein) {
112                                        return false;
113                                }
114                                action.accept(protein);
115                                return true;
116                        }
117
118                        /**
119                         * Fetch the next header/protein tuple from the cache.  If the cache is empty, fetch another
120                         * batch from the source file
121                         *
122                         * @param reader
123                         *              the input stream from which the FASTA content is read
124                         * @return the protein sequence
125                         */
126                        private ProteinSequence next(FastaReader<ProteinSequence, AminoAcidCompound> reader) {
127                                try {
128                                        if (!iterator.hasNext()) {
129                                                chunk = reader.process(getBatchSize());
130                                                if (null == chunk) {
131                                                        closed = true;
132                                                        reader.close();
133                                                        return null;
134                                                }
135                                                iterator = chunk.entrySet().iterator();
136                                        }
137                                        if (iterator.hasNext()) {
138                                                Map.Entry<String, ProteinSequence> entry = iterator.next();
139                                                return createSequence(entry.getValue());
140                                        }
141                                        closed = true;
142                                        reader.close();
143                                } catch (IOException exception) {
144                                        throw new UncheckedIOException(String.format("I/O error reading the FASTA file from '%s'", getPath()), exception);
145                                }
146                                return null;
147                        }
148                }; // Spliterator
149                return StreamSupport.stream(source, false);
150        }
151
152        /**
153         * Create the sequence with the information from the header.  This implementation return the sequence as-is, but
154         * this is an opportunity for the implementer to build specific information into the user collection space
155         * of the sequence
156         *
157         * @param sequence the protein sequence
158         * @return the sequence
159         */
160        protected ProteinSequence createSequence(ProteinSequence sequence) {
161                return sequence;
162        }
163
164        protected Path getPath() {
165                return path;
166        }
167
168        protected int getBatchSize() {
169                return batchSize;
170        }
171
172        protected SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> getHeaderParser() {
173                return Optional.ofNullable(headerParser).orElse(new GenericFastaHeaderParser<>());
174        }
175
176        public SequenceCreatorInterface<AminoAcidCompound> getSequenceCreator() {
177                return Optional.ofNullable(sequenceCreator).orElse(new ProteinSequenceCreator(AminoAcidCompoundSet.getAminoAcidCompoundSet()));
178        }
179}