(ns cljam.io.fasta.reader
  (:refer-clojure :exclude [read read-line])
  (:require [cljam.util :refer [graph?]]
            [cljam.io.fasta.util :refer [header-line? parse-header-line]]
            [cljam.io.fasta-index.core :as fasta-index]
            [cljam.io.util.bgzf.gzi :as gzi])
  (:import [java.io Closeable RandomAccessFile InputStream EOFException]
           [java.nio Buffer ByteBuffer CharBuffer]
           [java.nio.channels FileChannel$MapMode]
           [bgzf4j BGZFInputStream]))

FASTAReader

(deftype FASTAReader [reader stream url index-delay]
  Closeable
  (close [this]
    (.close ^java.io.Closeable (.reader this))
    (.close ^java.io.Closeable (.stream this))))
(defprotocol RandomAccessor
  (seek [this pos])
  (read-line [this])
  (read-buffer [this start end])
  (get-file-pointer [this]))
(extend-type RandomAccessFile
  RandomAccessor
  (seek [this pos]
    (.seek this pos))
  (read-line [this]
    (.readLine this))
  (read-buffer [this start end]
    (.. this
        getChannel
        (map FileChannel$MapMode/READ_ONLY start (- (long end) (long start)))))
  (get-file-pointer [this]
    (.getFilePointer this)))
(deftype IndexedBGZFInputStream [^BGZFInputStream is idx]
  Closeable
  (close [_]
    (.close is))
  RandomAccessor
  (seek [_ pos]
    (.seek is pos))
  (read-line [_]
    (.readLine is))
  (read-buffer [_ start end]
    (.seek is (gzi/uncomp->comp @idx start))
    (let [buf (byte-array (- (long end) (long start)))]
      (if (neg? (.read is buf))
        (throw (EOFException.))
        (ByteBuffer/wrap buf))))
  (get-file-pointer [_]
    (gzi/comp->uncomp @idx (.getFilePointer is))))

Reading

(defn- read* [line rdr]
  (loop [line line
         ret {}]
    (if-not (nil? line)
      (if (= (first line) \>)
        (if (seq ret)
          (cons (assoc ret :len (count (filter (partial not= \space) (:seq ret))))
                (lazy-seq (read* line rdr)))
          (let [ref' (subs line 1)
                offset (get-file-pointer rdr)]
            (recur (read-line rdr) (assoc ret :rname ref' :offset offset))))
        (if (:rname ret)
          (let [ret' (if (:line-len ret)
                       (update ret :seq str line)
                       (assoc ret
                              :seq line
                              :line-len (inc (count line))
                              :line-blen (count (filter graph? line))))]
            (recur (read-line rdr) ret'))
          (throw (ex-info "Missing sequence name" {:line line}))))
      (cons (assoc ret :len (count (filter (partial not= \space) (:seq ret))))
            nil))))

Reads fasta headers.

(defn load-headers
  [rdr]
  (seek rdr 0)
  (loop [line (read-line rdr), headers []]
    (if line
      (if (header-line? line)
        (let [offset (get-file-pointer rdr)]
          (recur (read-line rdr) (conj headers (merge (parse-header-line line)
                                                      {:offset offset}))))
        (recur (read-line rdr) headers))
      headers)))
(defn- read-sequence*
  [^FASTAReader rdr name']
  (when-let [line (read-line (.reader rdr))]
    (if-not (header-line? line)
      (if name'
        {:name name', :sequence line}
        (throw (ex-info "Missing sequence name" {})))
      (:name (parse-header-line line)))))

Reads sequences by line, returning the line-separated sequences as lazy sequence.

(defn read-sequences
  [^FASTAReader rdr]
  (seek (.reader rdr) 0)
  (letfn [(read-fn [rdr name']
            (let [s (read-sequence* rdr name')]
              (cond
                (string? s) (read-fn rdr s)
                (map? s) (cons s (lazy-seq (read-fn rdr name'))))))]
    (read-fn rdr nil)))

Reads the specified sequence range. start and end can each be set to nil, in which case, the first and the last element of the sequence are specified, respectively. rdr must be indexed.

(defn read-sequence
  [^FASTAReader rdr name' start end {:keys [mask?]}]
  (let [fai @(.index-delay rdr)]
    (when-let [len (:len (fasta-index/get-header fai name'))]
      (let [start' (max 1 (long (or start 1)))
            end' (min (long len) (long (or end len)))]
        (when (<= start' end')
          (let [buf (CharBuffer/allocate (inc (- end' start')))]
            (when-let [[s e] (fasta-index/get-span fai name' (dec start') end')]
              (let [bb ^ByteBuffer (read-buffer (.reader rdr) s e)]
                (if mask?
                  (while (.hasRemaining bb)
                    (let [c (unchecked-char (.get bb))]
                      (when-not (or (= \newline c) (= \return c))
                        (.put buf c))))
                  (while (.hasRemaining bb)
                    (let [c (unchecked-long (.get bb))]
                      (when-not (or (= 10 c) (= 13 c))
                        ;; toUpperCase works only for ASCII chars.
                        (.put buf (unchecked-char (bit-and c 0x5f))))))))
              (.flip ^Buffer buf)
              (.toString buf))))))))

Reads FASTA sequence data, returning its information as a lazy sequence.

(defn read
  [^FASTAReader rdr]
  (let [r (.reader rdr)]
    (read* (read-line r) r)))

Moves the file pointer of the given FASTA reader rdr to the beginning.

(defn reset
  [^FASTAReader rdr]
  (seek (.reader rdr) 0))

Copies bytes in [0, position) of the given buffer.

(definline create-ba
  [^ByteBuffer buffer]
  `(when (pos? (.position ~buffer))
     (let [ba# (byte-array (.position ~buffer))]
       (.clear ~(with-meta buffer {:tag `Buffer}))
       (.get ~buffer ba#)
       (.clear ~(with-meta buffer {:tag `Buffer}))
       ba#)))
(def ^:private ^:const gt-byte (byte (int \>)))
(def ^:private ^:const newline-byte (byte (int \newline)))
(defn- read-buffer!
  [^bytes buf ^long size buffers ^bytes byte-map]
  (let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers]
    (loop [i 0, name-line? false]
      (if (< i size)
        (let [b (aget buf i)]
          (cond
            (= b gt-byte) (if (pos? (.position seq-buf))
                            (do (.put rest-buf buf i (- size i))
                                true)
                            (recur (inc i) true))
            (= b newline-byte) (if name-line?
                                 (recur (inc i) false)
                                 (recur (inc i) name-line?))
            :else (do (if name-line?
                        (.put name-buf b)
                        (.put seq-buf (aget byte-map b)))
                      (recur (inc i) name-line?))))
        false))))
(defn- sequential-read1!
  [^InputStream stream buf buffers byte-map loaded-bytes]
  (let [{:keys [^ByteBuffer name-buf ^ByteBuffer seq-buf ^ByteBuffer rest-buf]} buffers
        read-preload? (atom (some? (seq loaded-bytes)))]
    (loop [new-ref? false]
      (if-not new-ref?
        (if @read-preload?
          (let [new-ref*? (read-buffer! loaded-bytes (count loaded-bytes) buffers byte-map)]
            (reset! read-preload? false)
            (recur new-ref*?))
          (let [n (.read stream buf)]
            (if (pos? n)
              (recur (read-buffer! buf n buffers byte-map))
              {:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? true})))
        {:name (create-ba name-buf) :sequence (create-ba seq-buf) :rest-bytes (create-ba rest-buf) :eof? false}))))
(defn- sequential-read!
  [stream buf buffers byte-map loaded-bytes eof?]
  (when (or (not eof?) (seq loaded-bytes))
    (lazy-seq
     (let [m (sequential-read1! stream buf buffers byte-map loaded-bytes)]
       (cons (select-keys m [:name :sequence])
             (sequential-read! stream buf buffers byte-map (:rest-bytes m) (:eof? m)))))))
(defn- sequential-read
  [stream page-size seq-buf-size byte-map]
  (let [buf (byte-array page-size)
        name-buf (ByteBuffer/allocate 1024)
        seq-buf (ByteBuffer/allocate seq-buf-size)
        rest-buf (ByteBuffer/allocate page-size)]
    (sequential-read! stream buf
                      {:name-buf name-buf :seq-buf seq-buf :rest-buf rest-buf}
                      byte-map (byte-array 0) false)))

Returns list of maps containing sequence as byte-array. Bases ACGTN are encoded as 1-5.

(defn sequential-read-byte-array
  [stream page-size seq-buf-size]
  (let [byte-map (byte-array (range 128))]
    (doseq [[i v] [[\a 1] [\A 1] [\c 2] [\C 2] [\g 3] [\G 3] [\t 4] [\T 4] [\n 5] [\N 5]]]
      (aset byte-map (int i) (byte v)))
    (sequential-read stream page-size seq-buf-size byte-map)))

Returns list of maps containing sequence as upper-case string.

(defn sequential-read-string
  [stream page-size seq-buf-size {:keys [mask?]}]
  (let [byte-map (byte-array (range 128))]
    (when-not mask?
      (doseq [[i v] [[\a \A] [\c \C] [\g \G] [\t \T] [\n \N]]]
        (aset byte-map (int i) (byte (int v)))))
    (map (fn [{^bytes name' :name
               ^bytes sequence' :sequence}]
           {:name (String. name') :sequence (String. sequence')})
         (sequential-read stream page-size seq-buf-size byte-map))))