Level analyzer of alignments in BAM.

(ns cljam.algo.level
  (:require [clojure.java.io :as cio]
            [com.climate.claypoole :as cp]
            [cljam.common :refer [get-exec-n-threads *n-threads*]]
            [cljam.io.protocols :as protocols]
            [cljam.io.sam :as sam]
            [cljam.io.util :as io-util]
            [cljam.util :as util]))

Adds level information to alignments from rdr and writes them to wtr. Level calculation process is multithreaded.

(defmulti add-level
  {:arglists '([rdr wtr])}
  (fn [rdr wtr]
    (cond
      (and (io-util/sam-reader? rdr) (io-util/sam-writer? wtr)) :sam
      (and (io-util/bam-reader? rdr) (io-util/bam-writer? wtr)) :bam
      :else nil)))
(defmethod add-level :sam
  [_ _]
  (throw (ex-info "SAM not supported yet." {:type :sam-not-supported}))
  ;; TODO implement
  )

Append level info to :options of an alignment. Requires volatile vector as a state.

(defn- add-level!
  [state a]
  (let [pos (long (:pos a))
        end (long (:end a))
        s @state
        lv (loop [i 0 x (first s) xs (next s)]
             (if-not x
               (do (vswap! state conj end) i)
               (if (< (long x) pos)
                 (do (vswap! state assoc i end) i)
                 (recur (inc i) (first xs) (next xs)))))]
    (update a :options conj {:LV {:type "i" :value (int lv)}})))

Create a path to the cache file.

(defn- cache-path
  [rdr temp-dir i]
  (->> i
       (format "%s_%06d.bam" (util/basename (protocols/reader-url rdr)))
       (cio/file temp-dir)
       (.getCanonicalPath)))

Adds level information to alignments from rdr and write them to wtr. Level calculation process is multithreaded.

(defn- add-level-mt
  [rdr wtr]
  (let [hdr (sam/read-header rdr)]
    (sam/write-header wtr hdr)
    (sam/write-refs wtr hdr)
    (cp/with-shutdown! [p (cp/threadpool (get-exec-n-threads))]
      (util/with-temp-dir [temp-dir "cljam.algo.level"]
        ;; split and compute levels
        (let [xs (cp/pfor p [[i {:keys [name]}] (map vector (range) (sam/read-refs rdr))]
                          (let [cache (cache-path rdr temp-dir i)]
                            (with-open [r (sam/reader rdr)
                                        w (sam/writer cache)]
                              (sam/write-header w hdr)
                              (sam/write-refs w hdr)
                              (->> {:chr name}
                                   (sam/read-alignments r)
                                   (map (partial add-level! (volatile! [])))
                                   (#(sam/write-alignments w % hdr))))
                            cache))]
          ;; merge
          (doseq [cache xs]
            (with-open [r (sam/reader cache)]
              (sam/write-blocks wtr (sam/read-blocks r)))))))))
(defmethod add-level :bam
  [rdr wtr & {:keys [n-threads]
              :or {n-threads 0}}]
  (when-not (-> rdr sam/read-header :HD :SO (= "coordinate"))
    (throw (ex-info "Source BAM file must be sorted by coordinate."
                    {:type :bam-not-sorted})))
  (binding [*n-threads* n-threads]
    (add-level-mt rdr wtr)))