Level analyzer of alignments in BAM. | (ns cljam.algo.level
(:require [clojure.java.io :as cio]
[com.climate.claypoole :as cp]
[cljam.common :refer [get-exec-n-threads *n-threads*]]
[cljam.io.protocols :as protocols]
[cljam.io.sam :as sam]
[cljam.io.util :as io-util]
[cljam.util :as util])) |
Adds level information to alignments from rdr and writes them to wtr. Level calculation process is multithreaded. | (defmulti add-level
{:arglists '([rdr wtr])}
(fn [rdr wtr]
(cond
(and (io-util/sam-reader? rdr) (io-util/sam-writer? wtr)) :sam
(and (io-util/bam-reader? rdr) (io-util/bam-writer? wtr)) :bam
:else nil))) |
(defmethod add-level :sam
[_ _]
(throw (ex-info "SAM not supported yet." {:type :sam-not-supported}))
;; TODO implement
) | |
Append level info to :options of an alignment. Requires volatile vector as a state. | (defn- add-level!
[state a]
(let [pos (long (:pos a))
end (long (:end a))
s @state
lv (loop [i 0 x (first s) xs (next s)]
(if-not x
(do (vswap! state conj end) i)
(if (< (long x) pos)
(do (vswap! state assoc i end) i)
(recur (inc i) (first xs) (next xs)))))]
(update a :options conj {:LV {:type "i" :value (int lv)}}))) |
Create a path to the cache file. | (defn- cache-path
[rdr temp-dir i]
(->> i
(format "%s_%06d.bam" (util/basename (protocols/reader-url rdr)))
(cio/file temp-dir)
(.getCanonicalPath))) |
Adds level information to alignments from rdr and write them to wtr. Level calculation process is multithreaded. | (defn- add-level-mt
[rdr wtr]
(let [hdr (sam/read-header rdr)]
(sam/write-header wtr hdr)
(sam/write-refs wtr hdr)
(cp/with-shutdown! [p (cp/threadpool (get-exec-n-threads))]
(util/with-temp-dir [temp-dir "cljam.algo.level"]
;; split and compute levels
(let [xs (cp/pfor p [[i {:keys [name]}] (map vector (range) (sam/read-refs rdr))]
(let [cache (cache-path rdr temp-dir i)]
(with-open [r (sam/reader rdr)
w (sam/writer cache)]
(sam/write-header w hdr)
(sam/write-refs w hdr)
(->> {:chr name}
(sam/read-alignments r)
(map (partial add-level! (volatile! [])))
(#(sam/write-alignments w % hdr))))
cache))]
;; merge
(doseq [cache xs]
(with-open [r (sam/reader cache)]
(sam/write-blocks wtr (sam/read-blocks r))))))))) |
(defmethod add-level :bam
[rdr wtr & {:keys [n-threads]
:or {n-threads 0}}]
(when-not (-> rdr sam/read-header :HD :SO (= "coordinate"))
(throw (ex-info "Source BAM file must be sorted by coordinate."
{:type :bam-not-sorted})))
(binding [*n-threads* n-threads]
(add-level-mt rdr wtr))) | |