Basic I/O of CSI:Coordinate Sorted Index files. | (ns cljam.io.csi
(:require [cljam.io.util.bgzf :as bgzf]
[cljam.io.util.bin :as util-bin]
[cljam.io.util.chunk :as chunk]
[cljam.io.util.lsb.data-io :as lsb.data]
[cljam.io.util.lsb.io-stream :as lsb.stream]
[clojure.string :as cstr])
(:import [java.io DataInputStream DataOutputStream IOException]
[java.nio ByteBuffer ByteOrder]
java.util.Arrays)) |
(def ^:const ^:private csi-magic "CSI\1") | |
(def ^:private ^:const default-vcf-csi-aux
{:format 2, :col-seq 1, :col-beg 2, :col-end 0, :meta-char \#, :skip 0}) | |
(def ^:private ^:const min-large-chunk-size 0x10000) | |
4byte x 7 (format,seq,beg,end,meta,skip,l_nm) | (def ^:private ^:const tabix-field-size (* 4 7)) |
(deftype CSI [n-ref min-shift depth bidx loffset aux]
util-bin/IBinningIndex
(get-chunks [_ ref-idx bins]
(vec (mapcat (get bidx ref-idx) bins)))
(get-min-offset [_ ref-idx beg]
(if-let [min-offsets (rsubseq (get loffset ref-idx) <= beg)]
(second (first min-offsets))
0))
(get-depth [_]
depth)
(get-min-shift [_]
min-shift)
(get-chr-names [_]
(:chrs aux))) | |
(defn- parse-tabix-aux [^bytes ba]
(when (<= tabix-field-size (alength ba))
(let [bb (doto (ByteBuffer/wrap ba)
(.order ByteOrder/LITTLE_ENDIAN))
format' (.getInt bb)
col-seq (.getInt bb)
col-beg (.getInt bb)
col-end (.getInt bb)
meta-char (char (.getInt bb))
skip (.getInt bb)
l-nm (.getInt bb)]
(when-not (= l-nm (.remaining bb))
(throw (ex-info "l-nm does not match"
{:l-nm l-nm, :remaining (.remaining bb)})))
{:format format', :col-seq col-seq, :col-beg col-beg, :col-end col-end,
:meta-char meta-char, :skip skip,
:chrs (cstr/split (String. ba (.position bb) (.remaining bb)) #"\00")}))) | |
(defn- create-tabix-aux [aux]
(let [contig-bytes (.getBytes (str (cstr/join (char 0) (:chrs aux)) (char 0)))
bb (doto (ByteBuffer/wrap
(byte-array (+ (alength contig-bytes) tabix-field-size)))
(.order ByteOrder/LITTLE_ENDIAN))]
(.putInt bb (:format aux))
(.putInt bb (:col-seq aux))
(.putInt bb (:col-beg aux))
(.putInt bb (:col-end aux))
(.putInt bb (int (:meta-char aux)))
(.putInt bb (:skip aux))
(.putInt bb (alength contig-bytes))
(.put bb contig-bytes)
(.array bb))) | |
(defn- read-chunks!
[rdr]
(let [n-chunk (lsb.data/read-int rdr)]
(->> #(let [beg (lsb.data/read-long rdr) end (lsb.data/read-long rdr)]
(chunk/->Chunk beg end))
(repeatedly n-chunk)
vec))) | |
(defn- read-bin-index
[rdr]
(let [n-ref (lsb.data/read-int rdr)]
(->> #(let [bin (lsb.data/read-int rdr)
loffset (lsb.data/read-long rdr)
chunks (read-chunks! rdr)]
{:bin (long bin), :loffset loffset, :chunks chunks})
(repeatedly n-ref)
vec))) | |
(defn- read-index*
^CSI [^DataInputStream rdr]
(when-not (Arrays/equals ^bytes (lsb.data/read-bytes rdr 4) (.getBytes csi-magic))
(throw (IOException. "Invalid CSI file")))
(let [min-shift (lsb.data/read-int rdr)
depth (lsb.data/read-int rdr)
l-aux (lsb.data/read-int rdr)
aux (lsb.data/read-bytes rdr l-aux)
tabix-aux (try (parse-tabix-aux aux) (catch Throwable _ nil))
n-ref (lsb.data/read-int rdr)
bins (vec (repeatedly n-ref #(read-bin-index rdr)))
max-bin (util-bin/max-bin depth)
bidx (mapv #(into {} (map (juxt :bin :chunks)) %) bins)
loffset (mapv
#(into
(sorted-map)
(keep
(fn [{:keys [^long bin loffset]}]
(when (<= bin max-bin)
[(util-bin/bin-beg bin min-shift depth)
loffset]))) %)
bins)]
(->CSI n-ref min-shift depth bidx loffset tabix-aux))) | |
Reads a CSI file | (defn read-index
^CSI [f]
(with-open [r (DataInputStream. (bgzf/bgzf-input-stream f))]
(read-index* r))) |
(defn- concatenate-offsets [offsets]
(reduce
(fn [res chunk']
(if (= (:file-end (peek res)) (:file-beg chunk'))
(assoc-in res [(dec (count res)) :file-end] (:file-end chunk'))
(conj res chunk')))
[]
(sort-by :file-beg offsets))) | |
(defn- small-chunks? [chunks]
(< (- (bgzf/get-block-address (:file-end (last chunks)))
(bgzf/get-block-address (:file-beg (first chunks))))
min-large-chunk-size)) | |
(defn- compress-bidx [^long depth bidx]
(letfn [(f [^long level bidx]
(reduce
(fn [ret [bin offsets]]
(let [parent-bin (util-bin/parent-bin bin)]
(if (and (= (util-bin/bin-level bin) level)
(contains? bidx parent-bin)
(small-chunks? offsets))
(update ret parent-bin concat offsets)
(assoc ret bin offsets))))
{}
(sort-by key bidx)))]
(reduce (fn [b level] (f level b)) bidx (range depth 0 -1)))) | |
(defn- calc-bidx [file-offsets ^long shift ^long depth]
(->> file-offsets
(group-by #(util-bin/reg->bin (:beg %) (:end %) shift depth))
(compress-bidx depth)
(map (fn [[bin offsets]]
[bin (->> (concatenate-offsets offsets)
(mapv #(chunk/->Chunk (:file-beg %) (:file-end %))))]))
(into (sorted-map)))) | |
(defn- calc-loffsets [begs file-offsets]
(->> begs
(map (fn [^long beg]
[beg (->> file-offsets
(drop-while #(< (long (:end %)) beg))
(map :file-beg)
first)]))
(into (sorted-map)))) | |
(defn- intmap->vec [xs]
(reduce
(fn [r [k v]]
(if (contains? r k)
(assoc r k v)
(conj (into r (repeat (- (long k) (count r)) nil)) v)))
[]
xs)) | |
Calculates loffsets and bidx
from offsets {:file-beg :file-end :beg :end :chr :chr-index}.
| (defn offsets->index
^CSI [offsets shift depth
{:keys [variant-file-type] :or {variant-file-type :bcf}}]
(let [pseudo-bin (+ 2 (util-bin/max-bin depth))
xs (->> offsets
(partition-by :chr-index)
(map
(fn [[{:keys [chr-index chr]} :as offsets]]
(let [b (calc-bidx offsets shift depth)
l (calc-loffsets
(into #{}
(comp
(filter (fn [^long x]
(<= x (util-bin/max-bin depth))))
(map #(util-bin/bin-beg % shift depth)))
(keys b))
offsets)
pseudo-chunks [(chunk/->Chunk
(:file-beg (first offsets))
(:file-end (last offsets)))
(chunk/->Chunk (count offsets) 0)]]
[chr-index
{:bidx (assoc b pseudo-bin pseudo-chunks),
:loffset l, :chr chr}])))
(intmap->vec)
((if (= variant-file-type :vcf)
(partial filterv identity)
identity)))
aux (when (= variant-file-type :vcf)
(assoc default-vcf-csi-aux :chrs (mapv :chr xs)))]
(->CSI (count xs) shift depth (mapv :bidx xs) (mapv :loffset xs) aux))) |
Writes CSI file from CSI data. | (defn write-index
[f ^CSI csi]
(let [max-bin (util-bin/max-bin (.depth csi))]
(with-open [w (DataOutputStream. (bgzf/bgzf-output-stream f))]
(lsb.stream/write-bytes w (.getBytes ^String csi-magic))
(lsb.stream/write-int w (.min-shift csi))
(lsb.stream/write-int w (.depth csi))
(let [tabix-aux (some-> (.aux csi) create-tabix-aux)]
(lsb.stream/write-int w (count tabix-aux))
(when tabix-aux
(lsb.stream/write-bytes w tabix-aux)))
(lsb.stream/write-int w (count (.bidx csi)))
(doseq [[offsets loffset] (map vector (.bidx csi) (.loffset csi))]
(lsb.stream/write-int w (count offsets))
(doseq [[bin chunks] offsets]
(lsb.stream/write-int w bin)
(lsb.stream/write-long
w
(if (<= (long bin) max-bin)
(get loffset (util-bin/bin-beg bin (.min-shift csi) (.depth csi)))
0))
(lsb.stream/write-int w (count chunks))
(doseq [chunk' chunks]
(lsb.stream/write-long w (:beg chunk'))
(lsb.stream/write-long w (:end chunk')))))))) |