Sorter of the SAM/BAM format alignments. | (ns cljam.algo.sorter
(:refer-clojure :exclude [sorted?])
(:require [clojure.tools.logging :as logging]
[com.climate.claypoole :as cp]
[cljam.io.protocols :as protocols]
[cljam.io.sam :as sam]
[cljam.io.util :as io-util]
[cljam.util :as util]
[cljam.io.sam.util.header :as header])
(:import [java.io Closeable]
[java.util PriorityQueue])) |
Default number of alignments to split when sorting. | (def ^:const default-chunk-size 1500000) |
(defn- refmap
[refs]
(into {} (map-indexed (fn [i x] [(:name x) i])) refs)) | |
comparators | |
(defn- coordinate-key
[rname->id {:keys [rname ref-id ^int pos ^int flag]}]
(unchecked-add
Long/MIN_VALUE
(bit-or
(bit-shift-left (int (or ref-id (rname->id rname -1))) 32)
(bit-shift-left pos 1)
(bit-and 1 (unsigned-bit-shift-right flag 4))))) | |
(defn- queryname-key
[{:keys [qname ^int flag]}]
(str
qname
(bit-and 3 (unsigned-bit-shift-right flag 6)))) | |
Same as sort-by but caching results of keyfn. | (defn- sort-by-index [keyfn coll] (sort-by :index (map (fn [x] (assoc x :index (keyfn x))) coll))) |
split/merge | |
Returns true if given files are the same type. | (defn- same-type? [f & files] (apply = (io-util/file-type f) (map io-util/file-type files))) |
Splits SAM/BAM file into multiple files each containing alignments up to chunk-size. name-fn must be a function taking an int value i and returning a path string for i-th output. read-fn must produce a sequence of alignments and write-fn must consume the splitted sequence. | (defn- split**
[rdr mode chunk-size name-fn read-fn write-fn]
(let [hdr (header/sorted-by mode (header/update-version (sam/read-header rdr)))]
(logging/info "Splitting...")
(cp/with-shutdown! [p (cp/threadpool (cp/ncpus))]
(->> (read-fn rdr)
(sequence
(comp
(partition-all (or chunk-size default-chunk-size))
(map-indexed (fn [i xs] [(name-fn i) xs]))))
(cp/pmap
p
(fn [[f xs]]
(logging/info (str "Sorting " (count xs) " alignments..."))
(with-open [wtr (sam/writer f)]
(sam/write-header wtr hdr)
(sam/write-refs wtr hdr)
(write-fn wtr xs hdr))
f))
doall)))) |
Splits SAM/BAM files with appropriate reader/writer functions. | (defn- split*
[rdr chunk-size name-fn mode sort-fn]
(let [block? (same-type? (protocols/reader-url rdr) (name-fn 0))
read-fn (if block?
(fn [r] (sam/read-blocks r {} {:mode mode}))
sam/read-alignments)
write-fn (if block?
(fn [w xs _] (sam/write-blocks w (sort-fn xs)))
(fn [w xs hdr] (sam/write-alignments w (sort-fn xs) hdr)))]
(split** rdr mode chunk-size name-fn read-fn write-fn))) |
Take the smallest element from sequences in priority queue. | (defn- head-pq
[^PriorityQueue q]
(when-not (.isEmpty q)
(let [x (.poll q)]
(when-let [nx (next x)]
(.add q nx))
(first x)))) |
Returns a lazy sequence from pre-sorted sequences. Each sequences must be sorted by key-fn. Returns first sequence if only 1 sequence is given. | (defn merge-sorted-seqs-by
[key-fn seqs]
(if (= (count seqs) 1)
(first seqs)
(let [pq (PriorityQueue.
(int (count seqs))
(fn pq-cmp [x y] (compare (key-fn (first x)) (key-fn (first y)))))]
(doseq [s seqs]
(.add pq s))
(take-while some? (repeatedly (fn repeat-head-pq [] (head-pq pq))))))) |
Merges multiple SAM/BAM files into single SAM/BAM file. | (defn- merge**
[wtr hdr files key-fn read-fn write-fn]
(let [rdrs (map sam/reader files)
alns (map (comp seq read-fn) rdrs)]
(sam/write-header wtr hdr)
(sam/write-refs wtr hdr)
(write-fn wtr (merge-sorted-seqs-by key-fn alns) hdr)
(doseq [rdr rdrs]
(.close ^Closeable rdr)))) |
Merges multiple SAM/BAM files with appropriate reader/writer functions. | (defn- merge*
[wtr hdr files mode key-fn]
(let [block? (apply same-type? (protocols/writer-url wtr) files)
read-fn (if block?
(fn [r] (sam/read-blocks r {} {:mode mode}))
sam/read-alignments)
write-fn (if block?
(fn [w xs _] (sam/write-blocks w xs))
sam/write-alignments)]
(merge** wtr hdr files key-fn read-fn write-fn))) |
Sorter | |
Generates i-th cache filename. | (defn- gen-cache-filename [fmt temp-dir prefix i] (format "%s/%s_%05d.%s" temp-dir prefix i fmt)) |
Sorts alignments of rdr by mode and writes them to wtr. :coordinate and :queryname are available for mode. | (defn sort!
[rdr wtr {:keys [mode chunk-size cache-fmt] :or {cache-fmt :bam}}]
(util/with-temp-dir [temp-dir "cljam.algo.sorter"]
(let [name-fn (->> rdr
protocols/reader-url
util/basename
(partial gen-cache-filename (name cache-fmt) temp-dir))
key-fn ({header/order-coordinate (partial coordinate-key (refmap (sam/read-refs rdr)))
header/order-queryname queryname-key} mode)
splitted-files (split* rdr chunk-size name-fn mode #(sort-by-index key-fn %))
hdr (header/sorted-by mode (header/update-version (sam/read-header rdr)))]
(logging/info (str "Merging from " (count splitted-files) " files..."))
(merge* wtr hdr splitted-files mode key-fn)
(logging/info "Deleting cache files...")))) |
Sorts alignments by chromosomal position. | (defn sort-by-pos
[rdr wtr & [option]]
(sort! rdr wtr (into (or option {}) {:mode header/order-coordinate}))) |
Sort alignments by query name. | (defn sort-by-qname
[rdr wtr & [option]]
(sort! rdr wtr (into (or option {}) {:mode header/order-queryname}))) |
Returns true if the sam is sorted, false if not. It is detected by
| (defn sorted? [rdr] (header/sorted? (sam/read-header rdr))) |
Returns sorting order of the sam as Keyword. Returning order is one of the following: :queryname, :coordinate, :unsorted, :unknown. | (defn sort-order [rdr] (header/sort-order (sam/read-header rdr))) |