Sorter of the SAM/BAM format alignments. | (ns cljam.algo.sorter (:refer-clojure :exclude [sorted?]) (:require [clojure.tools.logging :as logging] [com.climate.claypoole :as cp] [cljam.io.protocols :as protocols] [cljam.io.sam :as sam] [cljam.io.util :as io-util] [cljam.util :as util] [cljam.io.sam.util.header :as header]) (:import [java.io Closeable] [java.util PriorityQueue])) |
Default number of alignments to split when sorting. | (def ^:const default-chunk-size 1500000) |
(defn- refmap [refs] (into {} (map-indexed (fn [i x] [(:name x) i])) refs)) | |
comparators | |
(defn- coordinate-key [rname->id {:keys [rname ref-id ^int pos ^int flag]}] (unchecked-add Long/MIN_VALUE (bit-or (bit-shift-left (int (or ref-id (rname->id rname -1))) 32) (bit-shift-left pos 1) (bit-and 1 (unsigned-bit-shift-right flag 4))))) | |
(defn- queryname-key [{:keys [qname ^int flag]}] (str qname (bit-and 3 (unsigned-bit-shift-right flag 6)))) | |
Same as sort-by but caching results of keyfn. | (defn- sort-by-index [keyfn coll] (sort-by :index (map (fn [x] (assoc x :index (keyfn x))) coll))) |
split/merge | |
Returns true if given files are the same type. | (defn- same-type? [f & files] (apply = (io-util/file-type f) (map io-util/file-type files))) |
Splits SAM/BAM file into multiple files each containing alignments up to chunk-size. name-fn must be a function taking an int value i and returning a path string for i-th output. read-fn must produce a sequence of alignments and write-fn must consume the splitted sequence. | (defn- split** [rdr mode chunk-size name-fn read-fn write-fn] (let [hdr (header/sorted-by mode (header/update-version (sam/read-header rdr)))] (logging/info "Splitting...") (cp/with-shutdown! [p (cp/threadpool (cp/ncpus))] (->> (read-fn rdr) (sequence (comp (partition-all (or chunk-size default-chunk-size)) (map-indexed (fn [i xs] [(name-fn i) xs])))) (cp/pmap p (fn [[f xs]] (logging/info (str "Sorting " (count xs) " alignments...")) (with-open [wtr (sam/writer f)] (sam/write-header wtr hdr) (sam/write-refs wtr hdr) (write-fn wtr xs hdr)) f)) doall)))) |
Splits SAM/BAM files with appropriate reader/writer functions. | (defn- split* [rdr chunk-size name-fn mode sort-fn] (let [block? (same-type? (protocols/reader-url rdr) (name-fn 0)) read-fn (if block? (fn [r] (sam/read-blocks r {} {:mode mode})) sam/read-alignments) write-fn (if block? (fn [w xs _] (sam/write-blocks w (sort-fn xs))) (fn [w xs hdr] (sam/write-alignments w (sort-fn xs) hdr)))] (split** rdr mode chunk-size name-fn read-fn write-fn))) |
Take the smallest element from sequences in priority queue. | (defn- head-pq [^PriorityQueue q] (when-not (.isEmpty q) (let [x (.poll q)] (when-let [nx (next x)] (.add q nx)) (first x)))) |
Returns a lazy sequence from pre-sorted sequences. Each sequences must be sorted by key-fn. Returns first sequence if only 1 sequence is given. | (defn merge-sorted-seqs-by [key-fn seqs] (if (= (count seqs) 1) (first seqs) (let [pq (PriorityQueue. (int (count seqs)) (fn pq-cmp [x y] (compare (key-fn (first x)) (key-fn (first y)))))] (doseq [s seqs] (.add pq s)) (take-while some? (repeatedly (fn repeat-head-pq [] (head-pq pq))))))) |
Merges multiple SAM/BAM files into single SAM/BAM file. | (defn- merge** [wtr hdr files key-fn read-fn write-fn] (let [rdrs (map sam/reader files) alns (map (comp seq read-fn) rdrs)] (sam/write-header wtr hdr) (sam/write-refs wtr hdr) (write-fn wtr (merge-sorted-seqs-by key-fn alns) hdr) (doseq [rdr rdrs] (.close ^Closeable rdr)))) |
Merges multiple SAM/BAM files with appropriate reader/writer functions. | (defn- merge* [wtr hdr files mode key-fn] (let [block? (apply same-type? (protocols/writer-url wtr) files) read-fn (if block? (fn [r] (sam/read-blocks r {} {:mode mode})) sam/read-alignments) write-fn (if block? (fn [w xs _] (sam/write-blocks w xs)) sam/write-alignments)] (merge** wtr hdr files key-fn read-fn write-fn))) |
Sorter | |
Generates i-th cache filename. | (defn- gen-cache-filename [fmt temp-dir prefix i] (format "%s/%s_%05d.%s" temp-dir prefix i fmt)) |
Sorts alignments of rdr by mode and writes them to wtr. :coordinate and :queryname are available for mode. | (defn sort! [rdr wtr {:keys [mode chunk-size cache-fmt] :or {cache-fmt :bam}}] (util/with-temp-dir [temp-dir "cljam.algo.sorter"] (let [name-fn (->> rdr protocols/reader-url util/basename (partial gen-cache-filename (name cache-fmt) temp-dir)) key-fn ({header/order-coordinate (partial coordinate-key (refmap (sam/read-refs rdr))) header/order-queryname queryname-key} mode) splitted-files (split* rdr chunk-size name-fn mode #(sort-by-index key-fn %)) hdr (header/sorted-by mode (header/update-version (sam/read-header rdr)))] (logging/info (str "Merging from " (count splitted-files) " files...")) (merge* wtr hdr splitted-files mode key-fn) (logging/info "Deleting cache files...")))) |
Sorts alignments by chromosomal position. | (defn sort-by-pos [rdr wtr & [option]] (sort! rdr wtr (into (or option {}) {:mode header/order-coordinate}))) |
Sort alignments by query name. | (defn sort-by-qname [rdr wtr & [option]] (sort! rdr wtr (into (or option {}) {:mode header/order-queryname}))) |
Returns true if the sam is sorted, false if not. It is detected by
| (defn sorted? [rdr] (header/sorted? (sam/read-header rdr))) |
Returns sorting order of the sam as Keyword. Returning order is one of the following: :queryname, :coordinate, :unsorted, :unknown. | (defn sort-order [rdr] (header/sort-order (sam/read-header rdr))) |