Basic I/O of CSI:Coordinate Sorted Index files. | (ns cljam.io.csi (:require [cljam.io.util.bgzf :as bgzf] [cljam.io.util.bin :as util-bin] [cljam.io.util.chunk :as chunk] [cljam.io.util.lsb.data-io :as lsb.data] [cljam.io.util.lsb.io-stream :as lsb.stream] [clojure.string :as cstr]) (:import [java.io DataInputStream DataOutputStream IOException] [java.nio ByteBuffer ByteOrder] java.util.Arrays)) |
(def ^:const ^:private csi-magic "CSI\1") | |
(def ^:private ^:const default-vcf-csi-aux {:format 2, :col-seq 1, :col-beg 2, :col-end 0, :meta-char \#, :skip 0}) | |
(def ^:private ^:const min-large-chunk-size 0x10000) | |
4byte x 7 (format,seq,beg,end,meta,skip,l_nm) | (def ^:private ^:const tabix-field-size (* 4 7)) |
(deftype CSI [n-ref min-shift depth bidx loffset aux] util-bin/IBinningIndex (get-chunks [_ ref-idx bins] (vec (mapcat (get bidx ref-idx) bins))) (get-min-offset [_ ref-idx beg] (if-let [min-offsets (rsubseq (get loffset ref-idx) <= beg)] (second (first min-offsets)) 0)) (get-depth [_] depth) (get-min-shift [_] min-shift) (get-chr-names [_] (:chrs aux))) | |
(defn- parse-tabix-aux [^bytes ba] (when (<= tabix-field-size (alength ba)) (let [bb (doto (ByteBuffer/wrap ba) (.order ByteOrder/LITTLE_ENDIAN)) format' (.getInt bb) col-seq (.getInt bb) col-beg (.getInt bb) col-end (.getInt bb) meta-char (char (.getInt bb)) skip (.getInt bb) l-nm (.getInt bb)] (when-not (= l-nm (.remaining bb)) (throw (ex-info "l-nm does not match" {:l-nm l-nm, :remaining (.remaining bb)}))) {:format format', :col-seq col-seq, :col-beg col-beg, :col-end col-end, :meta-char meta-char, :skip skip, :chrs (cstr/split (String. ba (.position bb) (.remaining bb)) #"\00")}))) | |
(defn- create-tabix-aux [aux] (let [contig-bytes (.getBytes (str (cstr/join (char 0) (:chrs aux)) (char 0))) bb (doto (ByteBuffer/wrap (byte-array (+ (alength contig-bytes) tabix-field-size))) (.order ByteOrder/LITTLE_ENDIAN))] (.putInt bb (:format aux)) (.putInt bb (:col-seq aux)) (.putInt bb (:col-beg aux)) (.putInt bb (:col-end aux)) (.putInt bb (int (:meta-char aux))) (.putInt bb (:skip aux)) (.putInt bb (alength contig-bytes)) (.put bb contig-bytes) (.array bb))) | |
(defn- read-chunks! [rdr] (let [n-chunk (lsb.data/read-int rdr)] (->> #(let [beg (lsb.data/read-long rdr) end (lsb.data/read-long rdr)] (chunk/->Chunk beg end)) (repeatedly n-chunk) vec))) | |
(defn- read-bin-index [rdr] (let [n-ref (lsb.data/read-int rdr)] (->> #(let [bin (lsb.data/read-int rdr) loffset (lsb.data/read-long rdr) chunks (read-chunks! rdr)] {:bin (long bin), :loffset loffset, :chunks chunks}) (repeatedly n-ref) vec))) | |
(defn- read-index* ^CSI [^DataInputStream rdr] (when-not (Arrays/equals ^bytes (lsb.data/read-bytes rdr 4) (.getBytes csi-magic)) (throw (IOException. "Invalid CSI file"))) (let [min-shift (lsb.data/read-int rdr) depth (lsb.data/read-int rdr) l-aux (lsb.data/read-int rdr) aux (lsb.data/read-bytes rdr l-aux) tabix-aux (try (parse-tabix-aux aux) (catch Throwable _ nil)) n-ref (lsb.data/read-int rdr) bins (vec (repeatedly n-ref #(read-bin-index rdr))) max-bin (util-bin/max-bin depth) bidx (mapv #(into {} (map (juxt :bin :chunks)) %) bins) loffset (mapv #(into (sorted-map) (keep (fn [{:keys [^long bin loffset]}] (when (<= bin max-bin) [(util-bin/bin-beg bin min-shift depth) loffset]))) %) bins)] (->CSI n-ref min-shift depth bidx loffset tabix-aux))) | |
Reads a CSI file | (defn read-index ^CSI [f] (with-open [r (DataInputStream. (bgzf/bgzf-input-stream f))] (read-index* r))) |
(defn- concatenate-offsets [offsets] (reduce (fn [res chunk'] (if (= (:file-end (peek res)) (:file-beg chunk')) (assoc-in res [(dec (count res)) :file-end] (:file-end chunk')) (conj res chunk'))) [] (sort-by :file-beg offsets))) | |
(defn- small-chunks? [chunks] (< (- (bgzf/get-block-address (:file-end (last chunks))) (bgzf/get-block-address (:file-beg (first chunks)))) min-large-chunk-size)) | |
(defn- compress-bidx [^long depth bidx] (letfn [(f [^long level bidx] (reduce (fn [ret [bin offsets]] (let [parent-bin (util-bin/parent-bin bin)] (if (and (= (util-bin/bin-level bin) level) (contains? bidx parent-bin) (small-chunks? offsets)) (update ret parent-bin concat offsets) (assoc ret bin offsets)))) {} (sort-by key bidx)))] (reduce (fn [b level] (f level b)) bidx (range depth 0 -1)))) | |
(defn- calc-bidx [file-offsets ^long shift ^long depth] (->> file-offsets (group-by #(util-bin/reg->bin (:beg %) (:end %) shift depth)) (compress-bidx depth) (map (fn [[bin offsets]] [bin (->> (concatenate-offsets offsets) (mapv #(chunk/->Chunk (:file-beg %) (:file-end %))))])) (into (sorted-map)))) | |
(defn- calc-loffsets [begs file-offsets] (->> begs (map (fn [^long beg] [beg (->> file-offsets (drop-while #(< (long (:end %)) beg)) (map :file-beg) first)])) (into (sorted-map)))) | |
(defn- intmap->vec [xs] (reduce (fn [r [k v]] (if (contains? r k) (assoc r k v) (conj (into r (repeat (- (long k) (count r)) nil)) v))) [] xs)) | |
Calculates loffsets and bidx
from offsets {:file-beg :file-end :beg :end :chr :chr-index}.
| (defn offsets->index ^CSI [offsets shift depth {:keys [variant-file-type] :or {variant-file-type :bcf}}] (let [pseudo-bin (+ 2 (util-bin/max-bin depth)) xs (->> offsets (partition-by :chr-index) (map (fn [[{:keys [chr-index chr]} :as offsets]] (let [b (calc-bidx offsets shift depth) l (calc-loffsets (into #{} (comp (filter (fn [^long x] (<= x (util-bin/max-bin depth)))) (map #(util-bin/bin-beg % shift depth))) (keys b)) offsets) pseudo-chunks [(chunk/->Chunk (:file-beg (first offsets)) (:file-end (last offsets))) (chunk/->Chunk (count offsets) 0)]] [chr-index {:bidx (assoc b pseudo-bin pseudo-chunks), :loffset l, :chr chr}]))) (intmap->vec) ((if (= variant-file-type :vcf) (partial filterv identity) identity))) aux (when (= variant-file-type :vcf) (assoc default-vcf-csi-aux :chrs (mapv :chr xs)))] (->CSI (count xs) shift depth (mapv :bidx xs) (mapv :loffset xs) aux))) |
Writes CSI file from CSI data. | (defn write-index [f ^CSI csi] (let [max-bin (util-bin/max-bin (.depth csi))] (with-open [w (DataOutputStream. (bgzf/bgzf-output-stream f))] (lsb.stream/write-bytes w (.getBytes ^String csi-magic)) (lsb.stream/write-int w (.min-shift csi)) (lsb.stream/write-int w (.depth csi)) (let [tabix-aux (some-> (.aux csi) create-tabix-aux)] (lsb.stream/write-int w (count tabix-aux)) (when tabix-aux (lsb.stream/write-bytes w tabix-aux))) (lsb.stream/write-int w (count (.bidx csi))) (doseq [[offsets loffset] (map vector (.bidx csi) (.loffset csi))] (lsb.stream/write-int w (count offsets)) (doseq [[bin chunks] offsets] (lsb.stream/write-int w bin) (lsb.stream/write-long w (if (<= (long bin) max-bin) (get loffset (util-bin/bin-beg bin (.min-shift csi) (.depth csi))) 0)) (lsb.stream/write-int w (count chunks)) (doseq [chunk' chunks] (lsb.stream/write-long w (:beg chunk')) (lsb.stream/write-long w (:end chunk')))))))) |