(ns cljam.io.cram.reader (:require [cljam.io.cram.bit-stream :as bs] [cljam.io.cram.decode.data-series :as ds] [cljam.io.cram.decode.record :as record] [cljam.io.cram.decode.structure :as struct] [cljam.io.cram.seq-resolver.protocol :as resolver] [cljam.io.protocols :as protocols] [cljam.io.util.byte-buffer :as bb] [cljam.util.intervals :as intervals]) (:import [java.io Closeable] [java.nio Buffer ByteBuffer ByteOrder] [java.nio.channels FileChannel FileChannel$MapMode] [java.util Arrays])) | |
(declare read-alignments read-alignments-in-region) | |
(deftype CRAMReader [url channel buffer header refs offset index seq-resolver qname-generator] Closeable (close [_] (when seq-resolver (.close ^Closeable seq-resolver)) (.close ^FileChannel channel)) protocols/IReader (reader-url [_] url) (read [this] (protocols/read-alignments this)) (read [this region] (protocols/read-alignments this region)) (indexed? [_] (boolean @index)) protocols/IAlignmentReader (read-header [_] @header) (read-refs [_] @refs) (read-alignments [this] (read-alignments this)) (read-alignments [this region] (read-alignments-in-region this region)) #_(read-blocks [_]) #_(read-blocks [_ region]) #_(read-blocks [_ region option]) protocols/IRegionReader (read-in-region [this region] (protocols/read-in-region this region {})) (read-in-region [this region _] (protocols/read-alignments this region))) | |
(defn- read-to-buffer ([rdr] (read-to-buffer rdr nil)) ([^CRAMReader rdr limit] (let [^FileChannel ch (.-channel rdr) ^Buffer bb (.-buffer rdr)] (.clear bb) (.limit bb (or limit (.capacity bb))) (while (and (.hasRemaining bb) (< (.position ch) (.size ch))) (.read ch ^ByteBuffer bb)) (.flip bb)))) | |
Reads the CRAM file definition. | (defn read-file-definition [^CRAMReader rdr] (read-to-buffer rdr 26) (struct/decode-file-definition (.-buffer rdr))) |
(defn- seq-resolver-for-slice [^CRAMReader rdr slice-header blocks] (let [embedded-ref-content-id (long (:embedded-reference slice-header))] (if (>= embedded-ref-content-id 0) (let [ref-bases-block (->> blocks (filter #(= (:content-id %) embedded-ref-content-id)) first) offset (long (:start slice-header)) ^bytes bs (bb/read-bytes (:data ref-bases-block) (:raw-size ref-bases-block))] (reify resolver/ISeqResolver ;; According to the CRAM specification v3.1 ยง8.5, a slice with an embedded ;; reference must not be a multiple reference slice, so the temporary ;; sequence resolver here can safely ignore the passed chr (resolve-sequence [_ _chr start end] (Arrays/copyOfRange bs (- (long start) offset) (inc (- (long end) offset)))))) (.-seq-resolver rdr)))) | |
(defn- read-slice-records [^CRAMReader rdr bb compression-header slice-header] (let [blocks (into [] (map (fn [_] (struct/decode-block bb))) (range (:blocks slice-header))) core-block (first (filter #(zero? (long (:content-id %))) blocks)) bs-decoder (when core-block (bs/make-bit-stream-decoder (:data core-block))) ds-decoders (ds/build-data-series-decoders compression-header bs-decoder blocks) tag-decoders (ds/build-tag-decoders compression-header bs-decoder blocks)] (record/decode-slice-records (seq-resolver-for-slice rdr slice-header blocks) (.-qname-generator rdr) @(.-header rdr) compression-header slice-header ds-decoders tag-decoders))) | |
(defn- with-each-slice-header [^ByteBuffer bb f slice-offsets] (let [container-header-end (.position bb) compression-header (struct/decode-compression-header-block bb)] (mapcat (fn [^long offset] (.position ^Buffer bb (+ container-header-end offset)) (f compression-header (struct/decode-slice-header-block bb))) slice-offsets))) | |
(defn- read-container-records ([rdr bb container-header] (read-container-records rdr bb container-header nil)) ([^CRAMReader rdr bb container-header idx-entries] (->> (if (seq idx-entries) (map :slice-offset idx-entries) (:landmarks container-header)) (with-each-slice-header bb (fn [compression-header slice-header] (read-slice-records rdr bb compression-header slice-header)))))) | |
(defn- with-next-container-header [^CRAMReader rdr f] (let [^FileChannel ch (.-channel rdr) pos (.position ch) _ (read-to-buffer rdr) ^ByteBuffer bb (.-buffer rdr) container-header (struct/decode-container-header bb) container-start (+ pos (.position bb)) container-size (long (:length container-header)) ret (f container-header container-start)] (.position ch (+ container-start container-size)) ret)) | |
(defn- read-container-with [^CRAMReader rdr f] (letfn [(f' [container-header container-start] (let [container-size (long (:length container-header)) ^FileChannel ch (.-channel rdr) bb (-> ch (.map FileChannel$MapMode/READ_ONLY container-start container-size) (.order ByteOrder/LITTLE_ENDIAN))] (f container-header bb)))] (with-next-container-header rdr f'))) | |
Skips the next container. | (defn skip-container [rdr] (with-next-container-header rdr (constantly nil))) |
Reads the CRAM header from the CRAM header block. | (defn read-header [^CRAMReader rdr] (read-container-with rdr (fn [_ bb] (struct/decode-cram-header-block bb)))) |
Reads all the alignments from the CRAM file and returns them as a lazy sequence of alignment maps. | (defn read-alignments [^CRAMReader rdr] (let [^FileChannel ch (.-channel rdr)] (letfn [(read1 [container-header bb] (when-not (struct/eof-container? container-header) (read-container-records rdr bb container-header))) (step [] (when (< (.position ch) (.size ch)) (when-let [alns (read-container-with rdr read1)] (concat alns (lazy-seq (step))))))] (step)))) |
(defn- filter-overlapping-records [chr ^long start ^long end alns] (filter #(and (= (:rname %) chr) (<= (long (:pos %)) end) (<= start (long (:end %)))) alns)) | |
(defn- read-alignments-in-region-with-index [^CRAMReader rdr chr ^long start ^long end] (let [^FileChannel ch (.-channel rdr) idx @(.-index rdr) offset->entries (->> (intervals/find-overlap-intervals idx chr start end) (group-by :container-offset) (into (sorted-map)))] (letfn [(read-fn [entries] (fn [container-header bb] (read-container-records rdr bb container-header entries))) (step [[[^long offset entries] & more]] (when offset (.position ch offset) (concat (read-container-with rdr (read-fn entries)) (lazy-seq (step more)))))] (filter-overlapping-records chr start end (step offset->entries))))) | |
(defn- overlaps? [container-or-slice-header refs chr start end] (let [seq-id (long (:ref-seq-id container-or-slice-header))] (case seq-id -1 (= chr "*") ;; ref-seq-id = -2 means that the container or slice contains multiple ;; references, and the decoder can't tell in advance what reference ;; it actually has in it -2 true (let [chr' (get (nth refs seq-id) :name) start' (long (:start container-or-slice-header)) end' (+ start' (long (:span container-or-slice-header)))] (and (= chr' chr) (<= start' (long end)) (<= (long start) end')))))) | |
(defn- read-alignments-in-region-without-index [^CRAMReader rdr chr ^long start ^long end] (let [^FileChannel ch (.-channel rdr) refs @(.-refs rdr)] (letfn [(read1 [container-header bb] (when-not (struct/eof-container? container-header) (if (overlaps? container-header refs chr start end) (with-each-slice-header bb (fn [compression-header slice-header] (when (overlaps? slice-header refs chr start end) (read-slice-records rdr bb compression-header slice-header))) (:landmarks container-header)) ::skipped))) (step [] (when (< (.position ch) (.size ch)) (when-let [alns (read-container-with rdr read1)] (if (identical? alns ::skipped) (recur) (concat alns (lazy-seq (step)))))))] (.position ch (long @(.-offset rdr))) (filter-overlapping-records chr start end (step))))) | |
(defn- read-alignments-in-region [^CRAMReader rdr {:keys [chr ^long start ^long end] :or {start 0 end Long/MAX_VALUE}}] (if @(.-index rdr) (read-alignments-in-region-with-index rdr chr start end) (read-alignments-in-region-without-index rdr chr start end))) | |