(ns cljam.io.fasta.reader
(:refer-clojure :exclude [read read-line])
(:require [cljam.util :refer [graph?]]
[cljam.io.fasta.util :refer [header-line? parse-header-line]]
[cljam.io.fasta-index.core :as fasta-index]
[cljam.io.util.bgzf.gzi :as gzi])
(:import [java.io Closeable RandomAccessFile InputStream EOFException]
[java.nio Buffer ByteBuffer CharBuffer]
[java.nio.channels FileChannel$MapMode]
[bgzf4j BGZFInputStream])) | |
FASTAReader | |
(deftype FASTAReader [reader stream url index-delay]
Closeable
(close [this]
(.close ^java.io.Closeable (.reader this))
(.close ^java.io.Closeable (.stream this)))) | |
(defprotocol RandomAccessor (seek [this pos]) (read-line [this]) (read-buffer [this start end]) (get-file-pointer [this])) | |
(extend-type RandomAccessFile
RandomAccessor
(seek [this pos]
(.seek this pos))
(read-line [this]
(.readLine this))
(read-buffer [this start end]
(.. this
getChannel
(map FileChannel$MapMode/READ_ONLY start (- (long end) (long start)))))
(get-file-pointer [this]
(.getFilePointer this))) | |
(deftype IndexedBGZFInputStream [^BGZFInputStream is idx]
Closeable
(close [_]
(.close is))
RandomAccessor
(seek [_ pos]
(.seek is pos))
(read-line [_]
(.readLine is))
(read-buffer [_ start end]
(.seek is (gzi/uncomp->comp @idx start))
(let [buf (byte-array (- (long end) (long start)))]
(if (neg? (.read is buf))
(throw (EOFException.))
(ByteBuffer/wrap buf))))
(get-file-pointer [_]
(gzi/comp->uncomp @idx (.getFilePointer is)))) | |
Reading | |
(defn- read* [line rdr]
(loop [line line
ret {}]
(if-not (nil? line)
(if (= (first line) \>)
(if (seq ret)
(cons (assoc ret :len (count (filter (partial not= \space) (:seq ret))))
(lazy-seq (read* line rdr)))
(let [ref' (subs line 1)
offset (get-file-pointer rdr)]
(recur (read-line rdr) (assoc ret :rname ref' :offset offset))))
(if (:rname ret)
(let [ret' (if (:line-len ret)
(update ret :seq str line)
(assoc ret
:seq line
:line-len (inc (count line))
:line-blen (count (filter graph? line))))]
(recur (read-line rdr) ret'))
(throw (ex-info "Missing sequence name" {:line line}))))
(cons (assoc ret :len (count (filter (partial not= \space) (:seq ret))))
nil)))) | |
Reads fasta headers. | (defn load-headers
[rdr]
(seek rdr 0)
(loop [line (read-line rdr), headers []]
(if line
(if (header-line? line)
(let [offset (get-file-pointer rdr)]
(recur (read-line rdr) (conj headers (merge (parse-header-line line)
{:offset offset}))))
(recur (read-line rdr) headers))
headers))) |
(defn- read-sequence*
[^FASTAReader rdr name']
(when-let [line (read-line (.reader rdr))]
(if-not (header-line? line)
(if name'
{:name name', :sequence line}
(throw (ex-info "Missing sequence name" {})))
(:name (parse-header-line line))))) | |
Reads sequences by line, returning the line-separated sequences as lazy sequence. | (defn read-sequences
[^FASTAReader rdr]
(seek (.reader rdr) 0)
(letfn [(read-fn [rdr name']
(let [s (read-sequence* rdr name')]
(cond
(string? s) (read-fn rdr s)
(map? s) (cons s (lazy-seq (read-fn rdr name'))))))]
(read-fn rdr nil))) |
Reads the specified sequence range.
| (defn read-sequence
[^FASTAReader rdr name' start end {:keys [mask?]}]
(let [fai @(.index-delay rdr)]
(when-let [len (:len (fasta-index/get-header fai name'))]
(let [start' (max 1 (long (or start 1)))
end' (min (long len) (long (or end len)))]
(when (<= start' end')
(let [buf (CharBuffer/allocate (inc (- end' start')))]
(when-let [[s e] (fasta-index/get-span fai name' (dec start') end')]
(let [bb ^ByteBuffer (read-buffer (.reader rdr) s e)]
(if mask?
(while (.hasRemaining bb)
(let [c (unchecked-char (.get bb))]
(when-not (or (= \newline c) (= \return c))
(.put buf c))))
(while (.hasRemaining bb)
(let [c (unchecked-long (.get bb))]
(when-not (or (= 10 c) (= 13 c))
;; toUpperCase works only for ASCII chars.
(.put buf (unchecked-char (bit-and c 0x5f))))))))
(.flip ^Buffer buf)
(.toString buf)))))))) |
Reads FASTA sequence data, returning its information as a lazy sequence. | (defn read
[^FASTAReader rdr]
(let [r (.reader rdr)]
(read* (read-line r) r))) |
Moves the file pointer of the given FASTA reader | (defn reset [^FASTAReader rdr] (seek (.reader rdr) 0)) |
Copies bytes in [0, position) of the given | (definline create-ba
[^ByteBuffer buffer]
`(when (pos? (.position ~buffer))
(let [ba# (byte-array (.position ~buffer))]
(.clear ~(with-meta buffer {:tag `Buffer}))
(.get ~buffer ba#)
(.clear ~(with-meta buffer {:tag `Buffer}))
ba#))) |
(def ^:private ^:const gt-byte (byte (int \>))) (def ^:private ^:const newline-byte (byte (int \newline))) | |
(defn- read-buffer!
[^bytes buf ^long size buffers ^bytes byte-map]
(let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers]
(loop [i 0, name-line? false]
(if (< i size)
(let [b (aget buf i)]
(cond
(= b gt-byte) (if (pos? (.position seq-buf))
(do (.put rest-buf buf i (- size i))
true)
(recur (inc i) true))
(= b newline-byte) (if name-line?
(recur (inc i) false)
(recur (inc i) name-line?))
:else (do (if name-line?
(.put name-buf b)
(.put seq-buf (aget byte-map b)))
(recur (inc i) name-line?))))
false)))) | |
(defn- sequential-read1!
[^InputStream stream buf buffers byte-map loaded-bytes]
(let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers
read-preload? (atom (some? (seq loaded-bytes)))]
(loop [new-ref? false]
(if-not new-ref?
(if @read-preload?
(let [new-ref*? (read-buffer! loaded-bytes (count loaded-bytes) buffers byte-map)]
(reset! read-preload? false)
(recur new-ref*?))
(let [n (.read stream buf)]
(if (pos? n)
(recur (read-buffer! buf n buffers byte-map))
{:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? true})))
{:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? false})))) | |
(defn- sequential-read!
[stream buf buffers byte-map loaded-bytes eof?]
(when (or (not eof?) (seq loaded-bytes))
(lazy-seq
(let [m (sequential-read1! stream buf buffers byte-map loaded-bytes)]
(cons (select-keys m [:name :sequence])
(sequential-read! stream buf buffers byte-map (:rest-bytes m) (:eof? m))))))) | |
(defn- sequential-read
[stream page-size seq-buf-size byte-map]
(let [buf (byte-array page-size)
name-buf (ByteBuffer/allocate 1024)
seq-buf (ByteBuffer/allocate seq-buf-size)
rest-buf (ByteBuffer/allocate page-size)]
(sequential-read! stream buf
{:name-buf name-buf :seq-buf seq-buf :rest-buf rest-buf}
byte-map (byte-array 0) false))) | |
Returns list of maps containing sequence as byte-array. Bases ACGTN are encoded as 1-5. | (defn sequential-read-byte-array
[stream page-size seq-buf-size]
(let [byte-map (byte-array (range 128))]
(doseq [[i v] [[\a 1] [\A 1] [\c 2] [\C 2] [\g 3] [\G 3] [\t 4] [\T 4] [\n 5] [\N 5]]]
(aset byte-map (int i) (byte v)))
(sequential-read stream page-size seq-buf-size byte-map))) |
Returns list of maps containing sequence as upper-case string. | (defn sequential-read-string
[stream page-size seq-buf-size {:keys [mask?]}]
(let [byte-map (byte-array (range 128))]
(when-not mask?
(doseq [[i v] [[\a \A] [\c \C] [\g \G] [\t \T] [\n \N]]]
(aset byte-map (int i) (byte (int v)))))
(map (fn [{^bytes name' :name
^bytes sequence' :sequence}]
{:name (String. name') :sequence (String. sequence')})
(sequential-read stream page-size seq-buf-size byte-map)))) |