(ns cljam.io.bcf.reader
(:require [clojure.string :as cstr]
[clojure.tools.logging :as logging]
[cljam.io.protocols :as protocols]
[cljam.io.util.bgzf :as bgzf]
[cljam.io.util.byte-buffer :as bb]
[cljam.io.util.lsb.io-stream :as lsb]
[cljam.io.vcf.reader :as vcf-reader]
[cljam.io.vcf.util :as vcf-util]
[cljam.util :as util]
[cljam.io.util.bin :as util-bin]
[cljam.io.csi :as csi])
(:import [java.io Closeable IOException FileNotFoundException]
[java.net URL]
[java.nio Buffer ByteBuffer]
[bgzf4j BGZFInputStream])
(:refer-clojure :exclude [read])) | |
(declare read-variants meta-info read-variants-randomly read-file-offsets) | |
(deftype BCFReader
[^URL url meta-info header ^BGZFInputStream reader
^long start-pos index-delay]
Closeable
(close [this]
(.close ^Closeable (.reader this)))
protocols/IReader
(reader-url [this] (.url this))
(read [this] (protocols/read this {}))
(read [this option] (read-variants this option))
(indexed? [_]
(try
@index-delay
true
(catch FileNotFoundException _
false)))
protocols/IRegionReader
(read-in-region [this region]
(protocols/read-in-region this region {}))
(read-in-region [this {:keys [chr ^long start ^long end]} option]
(logging/warn "May cause degradation of performance.")
(filter
(fn [{^long v-pos :pos
v-chr :chr
v-ref :ref}]
(and (if chr (= v-chr chr) true)
(if start (<= start v-pos) true)
(if end (<= (+ v-pos (count v-ref)) end) true)))
(read-variants this option)))) | |
need dynamic extension for namespace issue. | (extend-type BCFReader
protocols/IVariantReader
(meta-info [this] (meta-info this))
(header [this] (.header this))
(read-variants
([this] (protocols/read-variants this {}))
([this option] (read-variants this option)))
(read-variants-randomly [this region-option deep-option]
(read-variants-randomly this region-option deep-option))
(read-file-offsets [this]
(read-file-offsets this))) |
Parses meta-info and header of BCF files and returns them as a map. lines must be a sequence of strings. | (defn- parse-meta-and-header
[lines]
(let [{metas "##"
headers "#"} (group-by #(re-find #"^\#+" %) lines)]
{:meta (transduce
(map vcf-reader/parse-meta-info-line)
(completing
(fn [meta-info' [k v]]
(if (#{:contig :info :filter :format :alt :sample :pedigree} k)
(update meta-info' k (fnil conj []) v)
(assoc meta-info' k v))))
{}
metas)
:header (first (map vcf-reader/parse-header-line headers))})) |
Returns an open cljam.bcf.reader.BCFReader of f. Should be used inside with-open to ensure the Reader is properly closed. Throws IOException if failed to parse BCF file format. | (defn reader
^BCFReader
[f]
(let [rdr (bgzf/bgzf-input-stream f)
magic (lsb/read-bytes rdr 5)]
(if (= (seq magic) (map byte "BCF\2\2"))
(let [hlen (int (lsb/read-int rdr))
header-buf (lsb/read-bytes rdr hlen)]
(if (zero? (aget ^bytes header-buf (dec hlen))) ;; NULL-terminated
(let [{:keys [header] meta-info :meta} (->> (String. ^bytes header-buf
0
(int (dec hlen)))
cstr/split-lines
parse-meta-and-header)]
(BCFReader. (util/as-url f) meta-info header rdr (.getFilePointer rdr)
(delay (csi/read-index (str f ".csi")))))
(do
(.close rdr)
(throw
(IOException.
"Invalid file format. BCF header must be NULL-terminated.")))))
(do
(.close rdr)
(throw
(IOException.
(str "Invalid file format. BCF v2.2 magic not found in " f))))))) |
Reads an atomic value, which is typed as either integer(8,16,32 bit) or float or character. | (defn- read-typed-atomic-value
[^ByteBuffer bb ^long type-id]
(case type-id
1 (let [i (.get bb)]
(case (bit-and 0xFF i) 0x80 nil 0x81 :eov i))
2 (let [i (.getShort bb)]
(case (bit-and 0xFFFF i) 0x8000 nil 0x8001 :eov i))
3 (let [i (.getInt bb)]
(case (bit-and 0xFFFFFFFF i) 0x80000000 nil 0x80000001 :eov i))
5 (let [i (.getInt bb)]
(case (bit-and 0xFFFFFFFF i) 0x7F800001 nil 0x7F800002
:eov (Float/intBitsToFloat i)))
7 (.get bb))) |
(defn- bytes->strs
[ba]
(map (fn [^String s]
(let [z (.indexOf s (int 0))
s' (cond-> s (not (neg? z)) (subs 0 z))]
(when-not (= s' ".") (not-empty s'))))
(cstr/split (String. (byte-array ba)) #","))) | |
Reads typed value from byte buffer. n-sample is a number of values repeated after type specifier byte. | (defn- read-typed-value
([bb]
(first (read-typed-value bb 1)))
([^ByteBuffer bb ^long n-sample]
(let [type-byte (int (.get bb))
len (unsigned-bit-shift-right (bit-and 0xF0 type-byte) 4)
total-len (if (= len 15) (long (first (read-typed-value bb))) len)
type-id (bit-and 0x0F type-byte)]
(case type-id
0 (repeat n-sample nil)
7 (mapv (fn [_] (bytes->strs (bb/read-bytes bb total-len)))
(range n-sample))
(into []
(comp (map (fn [_] (read-typed-atomic-value bb type-id)))
(partition-all total-len)
(map (fn [xs] (take-while #(not= % :eov) xs))))
(range (* n-sample total-len))))))) |
Reads a key-value pair. | (defn- read-typed-kv
([bb]
(let [[k [v]] (read-typed-kv bb 1)]
[k v]))
([bb n-sample]
[(first (read-typed-value bb)) (read-typed-value bb n-sample)])) |
Reads a single record of variant and store to the ByteBuffer objects. | (defn- read-data-line-buffer
[rdr]
(let [l-shared (lsb/read-uint rdr)
l-indv (lsb/read-uint rdr)
shared-bb (bb/allocate-lsb-byte-buffer l-shared)
indv-bb (bb/allocate-lsb-byte-buffer l-indv)]
(lsb/read-bytes rdr (.array shared-bb) 0 l-shared)
(lsb/read-bytes rdr (.array indv-bb) 0 l-indv)
{:l-shared l-shared
:l-indv l-indv
:shared shared-bb
:individual indv-bb})) |
Parses only chromosome, position and ref-length. Can be used for position-based querying. | (defn- parse-data-line-shallow
[contigs {:keys [^ByteBuffer shared] :as m}]
(let [chrom-id (.getInt shared)
pos (inc (.getInt shared))
rlen (.getInt shared)]
(.position ^Buffer shared 0)
(assoc m :chr (:id (contigs chrom-id)) :pos pos :rlen rlen))) |
Parses full data of a variant. Returns a map containing indices for meta-info. | (defn- parse-data-line-deep
[{:keys [^ByteBuffer shared ^ByteBuffer individual]}]
(let [chrom-id (.getInt shared)
pos (inc (.getInt shared))
rlen (.getInt shared)
qual (.getInt shared)
n-allele-info (.getInt shared)
n-allele (unsigned-bit-shift-right n-allele-info 16)
n-info (bit-and n-allele-info 0xFFFF)
n-fmt-sample (long (bb/read-uint shared))
n-fmt (bit-and 0xFF (unsigned-bit-shift-right n-fmt-sample 24))
n-sample (bit-and n-fmt-sample 0xFFFFFF)
id (let [i (read-typed-value shared)] (if (sequential? i) (first i) i))
refseq (read-typed-value shared)
altseq (doall (repeatedly (dec n-allele) #(read-typed-value shared)))
flter (read-typed-value shared)
info (doall (repeatedly n-info #(read-typed-kv shared)))
genotype (doall (repeatedly n-fmt #(read-typed-kv individual n-sample)))]
{:chr chrom-id
:pos pos
:ref-length rlen
:qual (when-not (= qual 0x7F800001) (Float/intBitsToFloat qual))
:id id
:ref (first refseq)
:alt (seq (map first altseq))
:filter flter
:info info
:genotype genotype
:n-sample n-sample})) |
(defn- fixup-val [{kw :kw t :type n :number} v]
(cond->> v
(= kw :GT) vcf-util/ints->genotype
(= t "Flag") ((constantly :exists))
(= t "Character") (map first)
(and (= n 1) (not= kw :GT)) first)) | |
Converts a BCF-style variant map to parsed variant using meta-info. | (defn- bcf-map->parsed-variant
[contigs filters formats info
[fmt-kw & indiv-kws] {:keys [genotype n-sample] :as variant}]
(let [->gt-kv (fn [i [k vs]]
(let [{:keys [kw] :as f} (formats k)
v (nth vs i nil)]
[kw (when-not (or (nil? v) (= [nil] v))
(fixup-val f v))]))
->info-kv (fn [[k v]]
(let [{:keys [kw] :as i} (info k)]
[kw (fixup-val i v)]))
indiv (map (fn [i] (not-empty (into {} (map #(->gt-kv i %)) genotype)))
(range n-sample))
v (-> (dissoc variant :genotype :ref-length :n-sample)
(update :chr (comp :id contigs))
(update :filter #(not-empty (map (comp :kw filters) %)))
(update :info #(not-empty (into {} (map ->info-kv) %))))]
(cond-> v
fmt-kw (assoc fmt-kw (not-empty (map (comp :kw formats first) genotype)))
indiv-kws (merge (zipmap indiv-kws indiv))))) |
Reads data from BCF file and returns them as a lazy-sequence of maps. | (defn- read-data-lines
[^BGZFInputStream rdr read-fn]
(when (pos? (.available rdr))
(let [data (read-fn rdr)]
(cons data (lazy-seq (read-data-lines rdr read-fn)))))) |
Creates a map for searching meta-info with indices. | (defn- meta->map
[meta-info]
(into {} (map (fn [m] [(Integer/parseInt (:idx m)) (assoc m :kw (keyword (:id m)))])) meta-info)) |
Returns meta-information of the BCF from rdr as a map. | (defn meta-info
[^BCFReader rdr]
(-> (.meta-info rdr)
(update :contig (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :filter (fn [xs] (keep (fn [m] (when-not (= (:id m) "PASS") (dissoc m :idx))) xs)))
(update :info (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
(update :format (fn [xs] (map (fn [m] (dissoc m :idx)) xs))))) |
(defn- make-parse-fn [^BCFReader rdr info depth]
(let [contigs (meta->map (:contig (.meta-info rdr)))
filters (assoc (meta->map (:filter (.meta-info rdr)))
0
{:id "PASS" :kw :PASS})
formats (meta->map (:format (.meta-info rdr)))
kws (mapv keyword (drop 8 (.header rdr)))]
(case depth
:deep (comp (partial bcf-map->parsed-variant
contigs filters formats info kws)
parse-data-line-deep)
:vcf (comp (vcf-util/variant-vals-stringifier
(.meta-info rdr)
(.header rdr))
(partial bcf-map->parsed-variant
contigs filters formats info kws)
parse-data-line-deep)
:bcf parse-data-line-deep
:shallow (partial parse-data-line-shallow contigs)
:raw identity))) | |
Returns data lines of the BCF from rdr as a lazy sequence of maps. rdr must implement cljam.bcf.BCFReader. Can take an option :depth to specify parsing level. Default is :deep. :deep Fully parsed variant map. FORMAT, FILTER, INFO and samples columns are parsed. :vcf VCF-style map. FORMAT, FILTER, INFO and samples columns are strings. :bcf BCF-style map. CHROM, FILTER, INFO and :genotype contains indices to meta-info. :shallow Only CHROM, POS and ref-length are parsed. :raw Raw map of ByteBuffers. | (defn read-variants
([rdr]
(read-variants rdr {}))
([^BCFReader rdr {:keys [depth] :or {depth :deep}}]
(.seek ^BGZFInputStream (.reader rdr) ^long (.start-pos rdr))
(let [info (meta->map (:info (.meta-info rdr)))
parse-fn (make-parse-fn rdr info depth)]
(read-data-lines (.reader rdr)
(fn [rdr] (parse-fn (read-data-line-buffer rdr))))))) |
(defn- make-lazy-variants [f s]
(when-first [fs s]
(lazy-cat
(f fs)
(make-lazy-variants f (rest s))))) | |
Reads variants of the BCF file randomly using csi file. Returns them as a lazy sequence. | (defn read-variants-randomly
[^BCFReader rdr
{:keys [chr ^long start ^long end] :or {start 1 end 4294967296}}
{:keys [depth] :or {depth :deep}}]
(let [info (meta->map (:info (.meta-info rdr)))
parse-fn (make-parse-fn rdr info depth)
input-stream ^BGZFInputStream (.reader rdr)
chr-names (->> (.meta-info rdr) :contig (mapv :id))
ref-idx (.indexOf ^clojure.lang.PersistentVector chr-names chr)
csi-data @(.index-delay rdr)
spans (when-not (neg? ref-idx)
(util-bin/get-spans csi-data ref-idx start end))]
(make-lazy-variants
(fn [[chunk-beg ^long chunk-end]]
(.seek input-stream chunk-beg)
(->> #(when (< (.getFilePointer input-stream) chunk-end)
(-> input-stream
read-data-line-buffer
parse-fn))
repeatedly
(take-while identity)
(filter
(fn [{chr' :chr :keys [^long pos info] ref' :ref}]
(and (= chr' chr)
(<= pos end)
(<= start
(long (get info :END (dec (+ pos (count ref')))))))))))
spans))) |
Reads file offsets and a genomic position of variants from BCF and returns them as a lazy sequence. Each element is a map containing :chr, :chr-index, :beg, :end, :file-beg, :file-end. | (defn read-file-offsets
[^BCFReader rdr]
(let [^BGZFInputStream input-stream (.reader rdr)
contigs (->> (:contig (.meta-info rdr))
(map-indexed (fn [index contig] [(:id contig) index]))
(into {}))
parse-fn (make-parse-fn rdr (meta->map (:info (.meta-info rdr))) :shallow)]
(letfn [(step [beg-pointer]
(when (pos? (.available input-stream))
(when-let [line (read-data-line-buffer input-stream)]
(let [end-pointer (.getFilePointer input-stream)
{:keys [chr ^long pos ^long rlen]} (parse-fn line)]
(cons {:file-beg beg-pointer, :file-end end-pointer
:chr-index (contigs chr), :beg pos, :chr chr,
:end (dec (+ pos rlen))}
(lazy-seq (step end-pointer)))))))]
(step (.getFilePointer input-stream))))) |