Level analyzer of alignments in BAM. | (ns cljam.algo.level (:require [clojure.java.io :as cio] [com.climate.claypoole :as cp] [cljam.common :refer [get-exec-n-threads *n-threads*]] [cljam.io.protocols :as protocols] [cljam.io.sam :as sam] [cljam.io.util :as io-util] [cljam.util :as util])) |
Adds level information to alignments from rdr and writes them to wtr. Level calculation process is multithreaded. | (defmulti add-level {:arglists '([rdr wtr])} (fn [rdr wtr] (cond (and (io-util/sam-reader? rdr) (io-util/sam-writer? wtr)) :sam (and (io-util/bam-reader? rdr) (io-util/bam-writer? wtr)) :bam :else nil))) |
(defmethod add-level :sam [_ _] (throw (ex-info "SAM not supported yet." {:type :sam-not-supported})) ;; TODO implement ) | |
Append level info to :options of an alignment. Requires volatile vector as a state. | (defn- add-level! [state a] (let [pos (long (:pos a)) end (long (:end a)) s @state lv (loop [i 0 x (first s) xs (next s)] (if-not x (do (vswap! state conj end) i) (if (< (long x) pos) (do (vswap! state assoc i end) i) (recur (inc i) (first xs) (next xs)))))] (update a :options conj {:LV {:type "i" :value (int lv)}}))) |
Create a path to the cache file. | (defn- cache-path [rdr temp-dir i] (->> i (format "%s_%06d.bam" (util/basename (protocols/reader-url rdr))) (cio/file temp-dir) (.getCanonicalPath))) |
Adds level information to alignments from rdr and write them to wtr. Level calculation process is multithreaded. | (defn- add-level-mt [rdr wtr] (let [hdr (sam/read-header rdr)] (sam/write-header wtr hdr) (sam/write-refs wtr hdr) (cp/with-shutdown! [p (cp/threadpool (get-exec-n-threads))] (util/with-temp-dir [temp-dir "cljam.algo.level"] ;; split and compute levels (let [xs (cp/pfor p [[i {:keys [name]}] (map vector (range) (sam/read-refs rdr))] (let [cache (cache-path rdr temp-dir i)] (with-open [r (sam/reader rdr) w (sam/writer cache)] (sam/write-header w hdr) (sam/write-refs w hdr) (->> {:chr name} (sam/read-alignments r) (map (partial add-level! (volatile! []))) (#(sam/write-alignments w % hdr)))) cache))] ;; merge (doseq [cache xs] (with-open [r (sam/reader cache)] (sam/write-blocks wtr (sam/read-blocks r))))))))) |
(defmethod add-level :bam [rdr wtr & {:keys [n-threads] :or {n-threads 0}}] (when-not (-> rdr sam/read-header :HD :SO (= "coordinate")) (throw (ex-info "Source BAM file must be sorted by coordinate." {:type :bam-not-sorted}))) (binding [*n-threads* n-threads] (add-level-mt rdr wtr))) | |