(ns cljam.io.cram.reader
  (:require [cljam.io.cram.bit-stream :as bs]
            [cljam.io.cram.decode.data-series :as ds]
            [cljam.io.cram.decode.record :as record]
            [cljam.io.cram.decode.structure :as struct]
            [cljam.io.cram.seq-resolver.protocol :as resolver]
            [cljam.io.protocols :as protocols]
            [cljam.io.util.byte-buffer :as bb]
            [cljam.util.intervals :as intervals])
  (:import [java.io Closeable]
           [java.nio Buffer ByteBuffer ByteOrder]
           [java.nio.channels FileChannel FileChannel$MapMode]
           [java.util Arrays]))
(declare read-alignments read-alignments-in-region)
(deftype CRAMReader [url channel buffer header refs offset index seq-resolver qname-generator]
  Closeable
  (close [_]
    (when seq-resolver
      (.close ^Closeable seq-resolver))
    (.close ^FileChannel channel))
  protocols/IReader
  (reader-url [_] url)
  (read [this]
    (protocols/read-alignments this))
  (read [this region]
    (protocols/read-alignments this region))
  (indexed? [_]
    (boolean @index))
  protocols/IAlignmentReader
  (read-header [_] @header)
  (read-refs [_] @refs)
  (read-alignments [this]
    (read-alignments this))
  (read-alignments [this region]
    (read-alignments-in-region this region))
  #_(read-blocks [_])
  #_(read-blocks [_ region])
  #_(read-blocks [_ region option])
  protocols/IRegionReader
  (read-in-region [this region]
    (protocols/read-in-region this region {}))
  (read-in-region [this region _]
    (protocols/read-alignments this region)))
(defn- read-to-buffer
  ([rdr] (read-to-buffer rdr nil))
  ([^CRAMReader rdr limit]
   (let [^FileChannel ch (.-channel rdr)
         ^Buffer bb (.-buffer rdr)]
     (.clear bb)
     (.limit bb (or limit (.capacity bb)))
     (while (and (.hasRemaining bb)
                 (< (.position ch) (.size ch)))
       (.read ch ^ByteBuffer bb))
     (.flip bb))))

Reads the CRAM file definition.

(defn read-file-definition
  [^CRAMReader rdr]
  (read-to-buffer rdr 26)
  (struct/decode-file-definition (.-buffer rdr)))
(defn- seq-resolver-for-slice [^CRAMReader rdr slice-header blocks]
  (let [embedded-ref-content-id (long (:embedded-reference slice-header))]
    (if (>= embedded-ref-content-id 0)
      (let [ref-bases-block (->> blocks
                                 (filter #(= (:content-id %) embedded-ref-content-id))
                                 first)
            offset (long (:start slice-header))
            ^bytes bs (bb/read-bytes (:data ref-bases-block)
                                     (:raw-size ref-bases-block))]
        (reify resolver/ISeqResolver
          ;; According to the CRAM specification v3.1 ยง8.5, a slice with an embedded
          ;; reference must not be a multiple reference slice, so the temporary
          ;; sequence resolver here can safely ignore the passed chr
          (resolve-sequence [_ _chr start end]
            (Arrays/copyOfRange bs
                                (- (long start) offset)
                                (inc (- (long end) offset))))))
      (.-seq-resolver rdr))))
(defn- read-slice-records [^CRAMReader rdr bb compression-header slice-header]
  (let [blocks (into [] (map (fn [_] (struct/decode-block bb)))
                     (range (:blocks slice-header)))
        core-block (first (filter #(zero? (long (:content-id %))) blocks))
        bs-decoder (when core-block
                     (bs/make-bit-stream-decoder (:data core-block)))
        ds-decoders (ds/build-data-series-decoders compression-header bs-decoder blocks)
        tag-decoders (ds/build-tag-decoders compression-header bs-decoder blocks)]
    (record/decode-slice-records (seq-resolver-for-slice rdr slice-header blocks)
                                 (.-qname-generator rdr)
                                 @(.-header rdr)
                                 compression-header
                                 slice-header
                                 ds-decoders
                                 tag-decoders)))
(defn- with-each-slice-header [^ByteBuffer bb f slice-offsets]
  (let [container-header-end (.position bb)
        compression-header (struct/decode-compression-header-block bb)]
    (mapcat (fn [^long offset]
              (.position ^Buffer bb (+ container-header-end offset))
              (f compression-header (struct/decode-slice-header-block bb)))
            slice-offsets)))
(defn- read-container-records
  ([rdr bb container-header]
   (read-container-records rdr bb container-header nil))
  ([^CRAMReader rdr bb container-header idx-entries]
   (->> (if (seq idx-entries)
          (map :slice-offset idx-entries)
          (:landmarks container-header))
        (with-each-slice-header bb
          (fn [compression-header slice-header]
            (read-slice-records rdr bb compression-header slice-header))))))
(defn- with-next-container-header [^CRAMReader rdr f]
  (let [^FileChannel ch (.-channel rdr)
        pos (.position ch)
        _ (read-to-buffer rdr)
        ^ByteBuffer bb (.-buffer rdr)
        container-header (struct/decode-container-header bb)
        container-start (+ pos (.position bb))
        container-size (long (:length container-header))
        ret (f container-header container-start)]
    (.position ch (+ container-start container-size))
    ret))
(defn- read-container-with [^CRAMReader rdr f]
  (letfn [(f' [container-header container-start]
            (let [container-size (long (:length container-header))
                  ^FileChannel ch (.-channel rdr)
                  bb (-> ch
                         (.map FileChannel$MapMode/READ_ONLY container-start container-size)
                         (.order ByteOrder/LITTLE_ENDIAN))]
              (f container-header bb)))]
    (with-next-container-header rdr f')))

Skips the next container.

(defn skip-container
  [rdr]
  (with-next-container-header rdr (constantly nil)))

Reads the CRAM header from the CRAM header block.

(defn read-header
  [^CRAMReader rdr]
  (read-container-with rdr (fn [_ bb] (struct/decode-cram-header-block bb))))

Reads all the alignments from the CRAM file and returns them as a lazy sequence of alignment maps.

(defn read-alignments
  [^CRAMReader rdr]
  (let [^FileChannel ch (.-channel rdr)]
    (letfn [(read1 [container-header bb]
              (when-not (struct/eof-container? container-header)
                (read-container-records rdr bb container-header)))
            (step []
              (when (< (.position ch) (.size ch))
                (when-let [alns (read-container-with rdr read1)]
                  (concat alns (lazy-seq (step))))))]
      (step))))
(defn- filter-overlapping-records [chr ^long start ^long end alns]
  (filter #(and (= (:rname %) chr)
                (<= (long (:pos %)) end)
                (<= start (long (:end %))))
          alns))
(defn- read-alignments-in-region-with-index [^CRAMReader rdr chr ^long start ^long end]
  (let [^FileChannel ch (.-channel rdr)
        idx @(.-index rdr)
        offset->entries (->> (intervals/find-overlap-intervals idx chr start end)
                             (group-by :container-offset)
                             (into (sorted-map)))]
    (letfn [(read-fn [entries]
              (fn [container-header bb]
                (read-container-records rdr bb container-header entries)))
            (step [[[^long offset entries] & more]]
              (when offset
                (.position ch offset)
                (concat (read-container-with rdr (read-fn entries))
                        (lazy-seq (step more)))))]
      (filter-overlapping-records chr start end (step offset->entries)))))
(defn- overlaps? [container-or-slice-header refs chr start end]
  (let [seq-id (long (:ref-seq-id container-or-slice-header))]
    (case seq-id
      -1 (= chr "*")
      ;; ref-seq-id = -2 means that the container or slice contains multiple
      ;; references, and the decoder can't tell in advance what reference
      ;; it actually has in it
      -2 true
      (let [chr' (get (nth refs seq-id) :name)
            start' (long (:start container-or-slice-header))
            end' (+ start' (long (:span container-or-slice-header)))]
        (and (= chr' chr)
             (<= start' (long end))
             (<= (long start) end'))))))
(defn- read-alignments-in-region-without-index [^CRAMReader rdr chr ^long start ^long end]
  (let [^FileChannel ch (.-channel rdr)
        refs @(.-refs rdr)]
    (letfn [(read1 [container-header bb]
              (when-not (struct/eof-container? container-header)
                (if (overlaps? container-header refs chr start end)
                  (with-each-slice-header bb
                    (fn [compression-header slice-header]
                      (when (overlaps? slice-header refs chr start end)
                        (read-slice-records rdr bb compression-header slice-header)))
                    (:landmarks container-header))
                  ::skipped)))
            (step []
              (when (< (.position ch) (.size ch))
                (when-let [alns (read-container-with rdr read1)]
                  (if (identical? alns ::skipped)
                    (recur)
                    (concat alns (lazy-seq (step)))))))]
      (.position ch (long @(.-offset rdr)))
      (filter-overlapping-records chr start end (step)))))
(defn- read-alignments-in-region
  [^CRAMReader rdr {:keys [chr ^long start ^long end] :or {start 0 end Long/MAX_VALUE}}]
  (if @(.-index rdr)
    (read-alignments-in-region-with-index rdr chr start end)
    (read-alignments-in-region-without-index rdr chr start end)))