001package org.biojava.nbio.core.sequence.io; 002 003import org.biojava.nbio.core.sequence.ProteinSequence; 004import org.biojava.nbio.core.sequence.compound.AminoAcidCompound; 005import org.biojava.nbio.core.sequence.compound.AminoAcidCompoundSet; 006import org.biojava.nbio.core.sequence.io.template.SequenceCreatorInterface; 007import org.biojava.nbio.core.sequence.io.template.SequenceHeaderParserInterface; 008import org.biojava.nbio.core.util.InputStreamProvider; 009 010import java.io.File; 011import java.io.IOException; 012import java.io.InputStream; 013import java.io.UncheckedIOException; 014import java.nio.file.Path; 015import java.util.Collections; 016import java.util.Iterator; 017import java.util.LinkedHashMap; 018import java.util.Map; 019import java.util.Optional; 020import java.util.Spliterator; 021import java.util.Spliterators; 022import java.util.function.Consumer; 023import java.util.stream.Stream; 024import java.util.stream.StreamSupport; 025 026/** 027 * Read from a FASTA file (or gzipped FASTA file) and create a Java stream of {@link ProteinSequence} objects 028 * for use in a functional programming paradigm. 029 * 030 * @author Gary Murphy 031 * @since 7.1.0 032 */ 033public class FastaStreamer { 034 035 private final Path path; 036 private int batchSize = 1_000; 037 private SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> headerParser; 038 private SequenceCreatorInterface<AminoAcidCompound> sequenceCreator; 039 private Map<String, ProteinSequence> chunk = new LinkedHashMap<>(); 040 private Iterator<Map.Entry<String, ProteinSequence>> iterator = Collections.emptyIterator(); 041 private boolean closed = false; 042 043 /** 044 * The constructor is private. Created via the <tt>from(...)</tt> static factory method 045 * 046 * @param path the path to the file containing the FASTA content (possibly GZipped) 047 */ 048 private FastaStreamer(final Path path) { 049 this.path = path; 050 } 051 052 public static FastaStreamer from(final Path path) { 053 return new FastaStreamer(path); 054 } 055 056 public static FastaStreamer from(File file) { 057 return from(file.toPath()); 058 } 059 060 public FastaStreamer withHeaderParser(SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> headerParser) { 061 this.headerParser = headerParser; 062 return this; 063 } 064 065 public FastaStreamer withSequenceCreator(SequenceCreatorInterface<AminoAcidCompound> sequenceCreator) { 066 this.sequenceCreator = sequenceCreator; 067 return this; 068 } 069 070 public FastaStreamer batchSize(int size) { 071 this.batchSize = size; 072 return this; 073 } 074 075 /** 076 * Enable iteration through the proteins in the file using syntax such as: 077 * <pre> 078 * for(ProteinSequence sequence : FastaStreamer.from(path).each()) { 079 * . 080 * . 081 * . 082 * } 083 * </pre> 084 * 085 * @return an iterable suitable for an iteration loop 086 */ 087 public Iterable<ProteinSequence> each() { 088 return () -> stream().iterator(); 089 } 090 091 /** 092 * Create a stream of protein sequences from the contents of the path 093 * @return the stream 094 */ 095 public Stream<ProteinSequence> stream() { 096 InputStreamProvider provider = new InputStreamProvider(); 097 InputStream input; 098 try { 099 input = provider.getInputStream(getPath().toFile()); 100 } catch (IOException exception) { 101 throw new UncheckedIOException(exception); 102 } 103 FastaReader<ProteinSequence, AminoAcidCompound> reader = new FastaReader<>(input, getHeaderParser(), getSequenceCreator()); 104 Spliterator<ProteinSequence> source = new Spliterators.AbstractSpliterator<>(Integer.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL) { 105 @Override 106 public boolean tryAdvance(Consumer<? super ProteinSequence> action) { 107 if (closed) { 108 return false; 109 } 110 ProteinSequence protein = next(reader); 111 if (null == protein) { 112 return false; 113 } 114 action.accept(protein); 115 return true; 116 } 117 118 /** 119 * Fetch the next header/protein tuple from the cache. If the cache is empty, fetch another 120 * batch from the source file 121 * 122 * @param reader 123 * the input stream from which the FASTA content is read 124 * @return the protein sequence 125 */ 126 private ProteinSequence next(FastaReader<ProteinSequence, AminoAcidCompound> reader) { 127 try { 128 if (!iterator.hasNext()) { 129 chunk = reader.process(getBatchSize()); 130 if (null == chunk) { 131 closed = true; 132 reader.close(); 133 return null; 134 } 135 iterator = chunk.entrySet().iterator(); 136 } 137 if (iterator.hasNext()) { 138 Map.Entry<String, ProteinSequence> entry = iterator.next(); 139 return createSequence(entry.getValue()); 140 } 141 closed = true; 142 reader.close(); 143 } catch (IOException exception) { 144 throw new UncheckedIOException(String.format("I/O error reading the FASTA file from '%s'", getPath()), exception); 145 } 146 return null; 147 } 148 }; // Spliterator 149 return StreamSupport.stream(source, false); 150 } 151 152 /** 153 * Create the sequence with the information from the header. This implementation return the sequence as-is, but 154 * this is an opportunity for the implementer to build specific information into the user collection space 155 * of the sequence 156 * 157 * @param sequence the protein sequence 158 * @return the sequence 159 */ 160 protected ProteinSequence createSequence(ProteinSequence sequence) { 161 return sequence; 162 } 163 164 protected Path getPath() { 165 return path; 166 } 167 168 protected int getBatchSize() { 169 return batchSize; 170 } 171 172 protected SequenceHeaderParserInterface<ProteinSequence, AminoAcidCompound> getHeaderParser() { 173 return Optional.ofNullable(headerParser).orElse(new GenericFastaHeaderParser<>()); 174 } 175 176 public SequenceCreatorInterface<AminoAcidCompound> getSequenceCreator() { 177 return Optional.ofNullable(sequenceCreator).orElse(new ProteinSequenceCreator(AminoAcidCompoundSet.getAminoAcidCompoundSet())); 178 } 179}