(ns cljam.io.bam-index.writer
(:require [com.climate.claypoole :as cp]
[cljam.common :refer [get-exec-n-threads]]
[cljam.io.util.bgzf :as bgzf]
[cljam.io.util.lsb.io-stream :as lsb]
[cljam.io.util.bin :as util-bin]
[cljam.io.bam-index.common :refer [linear-index-shift
linear-index-depth
max-bins
bai-magic]]
[cljam.io.util.chunk :as chunk]
[cljam.io.bam.decoder :as bam-decoder])
(:import [java.io DataOutputStream Closeable]
[cljam.io.bam.decoder BAMPointerBlock]
[cljam.io.util.chunk Chunk])) | |
BAIWriter | |
(deftype BAIWriter [^DataOutputStream writer refs url]
Closeable
(close [_]
(.close writer))) | |
Intermediate data definitionsUse record for performance. Record is faster than map for retrieving elements. | |
(defrecord MetaData [^long first-offset ^long last-offset ^long aligned-alns ^long unaligned-alns]) | |
(defrecord IndexStatus [^MetaData meta-data bin-index linear-index]) | |
Initializing | |
Returns initialized index status. This data structure is intermediate. Must
be passed | (defn- init-index-status
[]
(IndexStatus. (MetaData. -1 0 0 0)
;; Intermediate bin-index -> {bin1 chunks1, bin2 chunks2, ...}
;; e.g. {4681 [{:beg 97, :end 555} ...], 37450 [...] ...}
{}
;; Intermediate linear-index -> {pos1 value1, pos2 value2, ...}
;; e.g. {5415 4474732776, 14827 5955073327, ...}
{})) |
Updating | |
(defn- update-meta-data
[^MetaData meta-data ^BAMPointerBlock aln]
(let [first-offset (.first-offset meta-data)
last-offset (.last-offset meta-data)
aligned-alns (.aligned-alns meta-data)
unaligned-alns (.unaligned-alns meta-data)
beg (.pointer-beg aln)
end (.pointer-end aln)
aligned? (zero? (bit-and (.flag aln) 4))]
(MetaData.
;; first-offset
(if (or (< (bgzf/compare beg first-offset) 1)
(= first-offset -1))
beg first-offset)
;; last-offset
(if (< (bgzf/compare last-offset end) 1)
end last-offset)
;; aligned-alns
(if aligned? (inc aligned-alns) aligned-alns)
;; unaligned-alns
(if-not aligned? (inc unaligned-alns) unaligned-alns)))) | |
(defn- update-bin-index
[bin-index ^BAMPointerBlock aln]
(let [bin (util-bin/reg->bin
(.pos aln) (.end aln) linear-index-shift linear-index-depth)
beg (.pointer-beg aln)
end (.pointer-end aln)]
(assoc bin-index bin
(if-let [chunks (get bin-index bin)]
(let [last-chunk ^Chunk (peek chunks)]
(if (bgzf/same-or-adjacent-blocks? (.end last-chunk) beg)
(conj (pop chunks) (Chunk. (.beg last-chunk) end))
(conj chunks (Chunk. beg end))))
[(Chunk. beg end)])))) | |
(defn- update-linear-index
[linear-index ^BAMPointerBlock aln]
(let [beg (.pointer-beg aln)
aln-beg (.pos aln)
aln-end (.end aln)
win-beg (if (zero? aln-end)
(util-bin/pos->lidx-offset (dec aln-beg) linear-index-shift)
(util-bin/pos->lidx-offset aln-beg linear-index-shift))
win-end (if (zero? aln-end)
win-beg
(util-bin/pos->lidx-offset aln-end linear-index-shift))
min* (fn ^long [x]
(if x (min (long x) beg) beg))]
(loop [i win-beg, ret linear-index]
(if (<= i win-end)
(recur (inc i) (assoc ret i (min* (get ret i))))
ret)))) | |
(defn- update-index-status
[^IndexStatus index-status aln]
(IndexStatus. (update-meta-data (.meta-data index-status) aln)
(update-bin-index (.bin-index index-status) aln)
(update-linear-index (.linear-index index-status) aln))) | |
Merging indices | |
(defn- merge-meta-data
[^MetaData meta1 ^MetaData meta2]
(MetaData. (let [f1 (.first-offset meta1)
f2 (.first-offset meta2)]
(cond
(= f1 -1) f2
(= f2 -1) f1
:else (min f1 f2)))
(max (.last-offset meta1) (.last-offset meta2))
(+ (.aligned-alns meta1) (.aligned-alns meta2))
(+ (.unaligned-alns meta1) (.unaligned-alns meta2)))) | |
(defn- merge-chunks
[chunks1 chunks2]
(loop [[^Chunk f & r] (sort chunk/compare (concat chunks1 chunks2))
chunks' []]
(if f
(if-let [last-chunk ^Chunk (peek chunks')]
(if (bgzf/same-or-adjacent-blocks? (.end last-chunk) (.beg f))
(let [l (assoc last-chunk :end (.end f))]
(recur r (assoc chunks' (dec (count chunks')) l)))
(recur r (conj chunks' f)))
(recur r (conj chunks' f)))
chunks'))) | |
(defn- merge-bin-index [bin-map1 bin-map2] (merge-with merge-chunks bin-map1 bin-map2)) | |
(defn- merge-linear-index [lidx1 lidx2] (merge-with min lidx1 lidx2)) | |
Finalizing index | |
(defn- finalize-bin-index
[bin-index]
(->> bin-index
(seq)
(sort-by first)
(map (partial zipmap [:bin :chunks])))) | |
Complements a linear index. e.g. ([1 10] [3 30]) -> ([0 0] [1 10] [2 10] [3 30]) | (defn- complement-linear-index
[linear-index]
(loop [[f & r] (if (zero? (long (ffirst linear-index)))
linear-index
(conj linear-index [0 0]))
ret []]
(if (seq r)
(recur r (apply conj ret (map #(conj (vector %) (second f)) (range (first f) (ffirst r)))))
(conj ret f)))) |
(defn- finalize-linear-index
[linear-index]
(->> linear-index
(seq)
(sort-by first)
(complement-linear-index)
(map second))) | |
Writing index | |
(defn- write-bin
[w ^long bin chunks]
(lsb/write-int w bin)
;; chunks
(lsb/write-int w (count chunks))
(doseq [^Chunk chunk' chunks]
(lsb/write-long w (.beg chunk'))
(lsb/write-long w (.end chunk')))) | |
(defn- write-meta-data [w meta-data] (lsb/write-int w max-bins) (lsb/write-int w 2) (lsb/write-long w (:first-offset meta-data)) (lsb/write-long w (:last-offset meta-data)) (lsb/write-long w (:aligned-alns meta-data)) (lsb/write-long w (:unaligned-alns meta-data))) | |
Public | |
The number of alignments that is loaded each indexing process. This has an effect on performance of concurrent indexing. The default value is 10,000. | (def ^:dynamic *alignments-partition-size* 10000) |
Merging indices | |
Merges two intermediate indices, returning the merged intermediate index. | (defn merge-index
[idx1 idx2]
(let [no-coordinate-alns (+ (long (:no-coordinate-alns idx1))
(long (:no-coordinate-alns idx2)))
idx1 (dissoc idx1 :no-coordinate-alns)
idx2 (dissoc idx2 :no-coordinate-alns)]
(-> (merge-with
(fn [^IndexStatus v1 ^IndexStatus v2]
(IndexStatus. (merge-meta-data (.meta-data v1) (.meta-data v2))
(merge-bin-index (.bin-index v1) (.bin-index v2))
(merge-linear-index (.linear-index v1) (.linear-index v2))))
idx1 idx2)
(assoc :no-coordinate-alns no-coordinate-alns)))) |
Making index | |
Converts intermediate BAM index data structure into final one. Must be called in the final stage. | (defn finalize-index
[^long nrefs index]
(loop [i 0
index index]
(if (< i nrefs)
(if (get index i)
(recur (inc i) (-> index
(update-in [i :bin-index] finalize-bin-index)
(update-in [i :linear-index] finalize-linear-index)))
(recur (inc i) index))
index))) |
Calculates index from the references and alignments, returning it as a map. Returned index is still intermediate. It must be passed to finalize function in the final stage. | (defn make-index*
[alns]
(loop [[^BAMPointerBlock aln & rest'] alns
rid (.ref-id aln)
idx-status (init-index-status)
no-coordinate-alns 0
indices {}]
(if aln
(let [rid' (.ref-id aln)
new-ref? (not= rid' rid)
idx-status' (update-index-status
(if new-ref? (init-index-status) idx-status) aln)
no-coordinate-alns' (if (zero? (.pos aln))
(inc no-coordinate-alns)
no-coordinate-alns)
indices' (if new-ref?
(assoc indices rid idx-status)
indices)]
(recur rest' rid' idx-status' no-coordinate-alns' indices'))
(assoc indices rid idx-status
:no-coordinate-alns no-coordinate-alns)))) |
Calculates a BAM index from provided references and alignment blocks. Optionally, you can do this process concurrently. | (defn make-index-from-blocks
[^long nrefs blocks]
(let [n-threads (get-exec-n-threads)
make-index-fn (fn [blocks]
(if (= n-threads 1)
(->> blocks
(eduction (map bam-decoder/decode-pointer-block))
make-index*)
(cp/with-shutdown! [pool (cp/threadpool (dec n-threads))]
(->> blocks
(eduction (partition-all *alignments-partition-size*))
(cp/upmap pool (fn [sub-blocks]
(->> sub-blocks
(eduction (map bam-decoder/decode-pointer-block))
make-index*)))
(reduce merge-index {:no-coordinate-alns 0})))))]
(->> blocks
make-index-fn
(finalize-index nrefs)))) |
Update the last pointer of the index to the given value. | (defn update-last-pointer
[index eof-ptr]
(if (or (= (keys index) [:no-coordinate-alns])
(pos? (long (get index :no-coordinate-alns 0))))
index
(let [last-ref (apply max (keys (dissoc index :no-coordinate-alns)))
last-key (->> (for [[bin chunks] (get-in index [last-ref :bin-index])
[i {:keys [end]}] (map-indexed vector chunks)]
[end [last-ref :bin-index bin i :end]])
(apply max-key first)
last)]
(-> index
(assoc-in [last-ref :meta-data :last-offset] eof-ptr)
(assoc-in last-key eof-ptr))))) |
Writing index | |
Writes the index to a file. | (defn write-index*!
[wtr ^long nrefs indices]
;; magic
(lsb/write-bytes wtr (.getBytes ^String bai-magic))
;; n_ref
(lsb/write-int wtr nrefs)
(dotimes [i nrefs]
(let [index (get indices i)
n-bin (count (:bin-index index))]
;; bins
(if (zero? n-bin)
(lsb/write-int wtr 0)
(do
;; # of bins
(lsb/write-int wtr (inc n-bin))
(doseq [bin (:bin-index index)]
(write-bin wtr (:bin bin) (:chunks bin)))
;; meta data
(write-meta-data wtr (:meta-data index))))
;; linear index
(lsb/write-int wtr (count (:linear-index index)))
(doseq [l (:linear-index index)]
(lsb/write-long wtr l))))
;; no coordinate alignments
(lsb/write-long wtr (:no-coordinate-alns indices))) |
Calculates a BAM index from alns, writing the index to a file. | (defn write-index!
[^BAIWriter wtr alns]
(let [nrefs (count (.refs wtr))
indices (make-index-from-blocks nrefs alns)]
(write-index*! (.writer wtr) nrefs indices))) |