(ns cljam.io.bcf.reader
  (:require [clojure.string :as cstr]
            [clojure.tools.logging :as logging]
            [cljam.io.protocols :as protocols]
            [cljam.io.util.bgzf :as bgzf]
            [cljam.io.util.byte-buffer :as bb]
            [cljam.io.util.lsb.io-stream :as lsb]
            [cljam.io.vcf.reader :as vcf-reader]
            [cljam.io.vcf.util :as vcf-util]
            [cljam.util :as util]
            [cljam.io.util.bin :as util-bin]
            [cljam.io.csi :as csi])
  (:import [java.io Closeable IOException FileNotFoundException]
           [java.net URL]
           [java.nio Buffer ByteBuffer]
           [bgzf4j BGZFInputStream])
  (:refer-clojure :exclude [read]))
(declare read-variants meta-info read-variants-randomly read-file-offsets)
(deftype BCFReader
         [^URL url meta-info header ^BGZFInputStream reader
          ^long start-pos index-delay]
  Closeable
  (close [this]
    (.close ^Closeable (.reader this)))
  protocols/IReader
  (reader-url [this] (.url this))
  (read [this] (protocols/read this {}))
  (read [this option] (read-variants this option))
  (indexed? [_]
    (try
      @index-delay
      true
      (catch FileNotFoundException _
        false)))
  protocols/IRegionReader
  (read-in-region [this region]
    (protocols/read-in-region this region {}))
  (read-in-region [this {:keys [chr ^long start ^long end]} option]
    (logging/warn "May cause degradation of performance.")
    (filter
     (fn [{^long v-pos :pos
           v-chr :chr
           v-ref :ref}]
       (and (if chr (= v-chr chr) true)
            (if start (<= start v-pos) true)
            (if end (<= (+ v-pos (count v-ref)) end) true)))
     (read-variants this option))))

need dynamic extension for namespace issue.

(extend-type BCFReader
  protocols/IVariantReader
  (meta-info [this] (meta-info this))
  (header [this] (.header this))
  (read-variants
    ([this] (protocols/read-variants this {}))
    ([this option] (read-variants this option)))
  (read-variants-randomly [this region-option deep-option]
    (read-variants-randomly this region-option deep-option))
  (read-file-offsets [this]
    (read-file-offsets this)))

Parses meta-info and header of BCF files and returns them as a map. lines must be a sequence of strings.

(defn- parse-meta-and-header
  [lines]
  (let [{metas "##"
         headers "#"} (group-by #(re-find #"^\#+" %) lines)]
    {:meta (transduce
            (map vcf-reader/parse-meta-info-line)
            (completing
             (fn [meta-info' [k v]]
               (if (#{:contig :info :filter :format :alt :sample :pedigree} k)
                 (update meta-info' k (fnil conj []) v)
                 (assoc meta-info' k v))))
            {}
            metas)
     :header (first (map vcf-reader/parse-header-line headers))}))

Returns an open cljam.bcf.reader.BCFReader of f. Should be used inside with-open to ensure the Reader is properly closed. Throws IOException if failed to parse BCF file format.

(defn reader
  ^BCFReader
  [f]
  (let [rdr (bgzf/bgzf-input-stream f)
        magic (lsb/read-bytes rdr 5)]
    (if (= (seq magic) (map byte "BCF\2\2"))
      (let [hlen (int (lsb/read-int rdr))
            header-buf (lsb/read-bytes rdr hlen)]
        (if (zero? (aget ^bytes header-buf (dec hlen))) ;; NULL-terminated
          (let [{:keys [header] meta-info :meta} (->> (String. ^bytes header-buf
                                                               0
                                                               (int (dec hlen)))
                                                      cstr/split-lines
                                                      parse-meta-and-header)]
            (BCFReader. (util/as-url f) meta-info header rdr (.getFilePointer rdr)
                        (delay (csi/read-index (str f ".csi")))))
          (do
            (.close rdr)
            (throw
             (IOException.
              "Invalid file format. BCF header must be NULL-terminated.")))))
      (do
        (.close rdr)
        (throw
         (IOException.
          (str "Invalid file format. BCF v2.2 magic not found in " f)))))))

Reads an atomic value, which is typed as either integer(8,16,32 bit) or float or character.

(defn- read-typed-atomic-value
  [^ByteBuffer bb ^long type-id]
  (case type-id
    1 (let [i (.get bb)]
        (case (bit-and 0xFF i) 0x80 nil 0x81 :eov i))
    2 (let [i (.getShort bb)]
        (case (bit-and 0xFFFF i) 0x8000 nil 0x8001 :eov i))
    3 (let [i (.getInt bb)]
        (case (bit-and 0xFFFFFFFF i) 0x80000000 nil 0x80000001 :eov i))
    5 (let [i (.getInt bb)]
        (case (bit-and 0xFFFFFFFF i) 0x7F800001 nil 0x7F800002
              :eov (Float/intBitsToFloat i)))
    7 (.get bb)))
(defn- bytes->strs
  [ba]
  (map (fn [^String s]
         (let [z (.indexOf s (int 0))
               s' (cond-> s (not (neg? z)) (subs 0 z))]
           (when-not (= s' ".") (not-empty s'))))
       (cstr/split (String. (byte-array ba)) #",")))

Reads typed value from byte buffer. n-sample is a number of values repeated after type specifier byte.

(defn- read-typed-value
  ([bb]
   (first (read-typed-value bb 1)))
  ([^ByteBuffer bb ^long n-sample]
   (let [type-byte (int (.get bb))
         len (unsigned-bit-shift-right (bit-and 0xF0 type-byte) 4)
         total-len (if (= len 15) (long (first (read-typed-value bb))) len)
         type-id (bit-and 0x0F type-byte)]
     (case type-id
       0 (repeat n-sample nil)
       7  (mapv (fn [_] (bytes->strs (bb/read-bytes bb total-len)))
                (range n-sample))
       (into []
             (comp (map (fn [_] (read-typed-atomic-value bb type-id)))
                   (partition-all total-len)
                   (map (fn [xs] (take-while #(not= % :eov) xs))))
             (range (* n-sample total-len)))))))

Reads a key-value pair.

(defn- read-typed-kv
  ([bb]
   (let [[k [v]] (read-typed-kv bb 1)]
     [k v]))
  ([bb n-sample]
   [(first (read-typed-value bb)) (read-typed-value bb n-sample)]))

Reads a single record of variant and store to the ByteBuffer objects.

(defn- read-data-line-buffer
  [rdr]
  (let [l-shared (lsb/read-uint rdr)
        l-indv (lsb/read-uint rdr)
        shared-bb (bb/allocate-lsb-byte-buffer l-shared)
        indv-bb (bb/allocate-lsb-byte-buffer l-indv)]
    (lsb/read-bytes rdr (.array shared-bb) 0 l-shared)
    (lsb/read-bytes rdr (.array indv-bb) 0 l-indv)
    {:l-shared l-shared
     :l-indv l-indv
     :shared shared-bb
     :individual indv-bb}))

Parses only chromosome, position and ref-length. Can be used for position-based querying.

(defn- parse-data-line-shallow
  [contigs {:keys [^ByteBuffer shared] :as m}]
  (let [chrom-id (.getInt shared)
        pos (inc (.getInt shared))
        rlen (.getInt shared)]
    (.position ^Buffer shared 0)
    (assoc m :chr (:id (contigs chrom-id)) :pos pos :rlen rlen)))

Parses full data of a variant. Returns a map containing indices for meta-info.

(defn- parse-data-line-deep
  [{:keys [^ByteBuffer shared ^ByteBuffer individual]}]
  (let [chrom-id (.getInt shared)
        pos (inc (.getInt shared))
        rlen (.getInt shared)
        qual (.getInt shared)
        n-allele-info (.getInt shared)
        n-allele (unsigned-bit-shift-right n-allele-info 16)
        n-info (bit-and n-allele-info 0xFFFF)
        n-fmt-sample (long (bb/read-uint shared))
        n-fmt (bit-and 0xFF (unsigned-bit-shift-right n-fmt-sample 24))
        n-sample (bit-and n-fmt-sample 0xFFFFFF)
        id (let [i (read-typed-value shared)] (if (sequential? i) (first i) i))
        refseq (read-typed-value shared)
        altseq (doall (repeatedly (dec n-allele) #(read-typed-value shared)))
        flter (read-typed-value shared)
        info (doall (repeatedly n-info #(read-typed-kv shared)))
        genotype (doall (repeatedly n-fmt #(read-typed-kv individual n-sample)))]
    {:chr chrom-id
     :pos pos
     :ref-length rlen
     :qual (when-not (= qual 0x7F800001) (Float/intBitsToFloat qual))
     :id id
     :ref (first refseq)
     :alt (seq (map first altseq))
     :filter flter
     :info info
     :genotype genotype
     :n-sample n-sample}))
(defn- fixup-val [{kw :kw t :type n :number} v]
  (cond->> v
    (= kw :GT) vcf-util/ints->genotype
    (= t "Flag") ((constantly :exists))
    (= t "Character") (map first)
    (and (= n 1) (not= kw :GT)) first))

Converts a BCF-style variant map to parsed variant using meta-info.

(defn- bcf-map->parsed-variant
  [contigs filters formats info
   [fmt-kw & indiv-kws] {:keys [genotype n-sample] :as variant}]
  (let [->gt-kv (fn [i [k vs]]
                  (let [{:keys [kw] :as f} (formats k)
                        v (nth vs i nil)]
                    [kw (when-not (or (nil? v) (= [nil] v))
                          (fixup-val f v))]))
        ->info-kv (fn [[k v]]
                    (let [{:keys [kw] :as i} (info k)]
                      [kw (fixup-val i v)]))
        indiv (map (fn [i] (not-empty (into {} (map #(->gt-kv i %)) genotype)))
                   (range n-sample))
        v (-> (dissoc variant :genotype :ref-length :n-sample)
              (update :chr (comp :id contigs))
              (update :filter #(not-empty (map (comp :kw filters) %)))
              (update :info #(not-empty (into {} (map ->info-kv) %))))]
    (cond-> v
      fmt-kw (assoc fmt-kw (not-empty (map (comp :kw formats first) genotype)))
      indiv-kws (merge (zipmap indiv-kws indiv)))))

Reads data from BCF file and returns them as a lazy-sequence of maps.

(defn- read-data-lines
  [^BGZFInputStream rdr read-fn]
  (when (pos? (.available rdr))
    (let [data (read-fn rdr)]
      (cons data (lazy-seq (read-data-lines rdr read-fn))))))

Creates a map for searching meta-info with indices.

(defn- meta->map
  [meta-info]
  (into {} (map (fn [m] [(Integer/parseInt (:idx m)) (assoc m :kw (keyword (:id m)))])) meta-info))

Returns meta-information of the BCF from rdr as a map.

(defn meta-info
  [^BCFReader rdr]
  (-> (.meta-info rdr)
      (update :contig (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
      (update :filter (fn [xs] (keep (fn [m] (when-not (= (:id m) "PASS") (dissoc m :idx))) xs)))
      (update :info (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))
      (update :format (fn [xs] (map (fn [m] (dissoc m :idx)) xs)))))
(defn- make-parse-fn [^BCFReader rdr info depth]
  (let [contigs (meta->map (:contig (.meta-info rdr)))
        filters (assoc (meta->map (:filter (.meta-info rdr)))
                       0
                       {:id "PASS" :kw :PASS})
        formats (meta->map (:format (.meta-info rdr)))
        kws (mapv keyword (drop 8 (.header rdr)))]
    (case depth
      :deep (comp (partial bcf-map->parsed-variant
                           contigs filters formats info kws)
                  parse-data-line-deep)
      :vcf (comp (vcf-util/variant-vals-stringifier
                  (.meta-info rdr)
                  (.header rdr))
                 (partial bcf-map->parsed-variant
                          contigs filters formats info kws)
                 parse-data-line-deep)
      :bcf parse-data-line-deep
      :shallow (partial parse-data-line-shallow contigs)
      :raw identity)))

Returns data lines of the BCF from rdr as a lazy sequence of maps. rdr must implement cljam.bcf.BCFReader. Can take an option :depth to specify parsing level. Default is :deep.

:deep Fully parsed variant map. FORMAT, FILTER, INFO and samples columns are parsed. :vcf VCF-style map. FORMAT, FILTER, INFO and samples columns are strings. :bcf BCF-style map. CHROM, FILTER, INFO and :genotype contains indices to meta-info. :shallow Only CHROM, POS and ref-length are parsed. :raw Raw map of ByteBuffers.

(defn read-variants
  ([rdr]
   (read-variants rdr {}))
  ([^BCFReader rdr {:keys [depth] :or {depth :deep}}]
   (.seek ^BGZFInputStream (.reader rdr) ^long (.start-pos rdr))
   (let [info (meta->map (:info (.meta-info rdr)))
         parse-fn  (make-parse-fn rdr info depth)]
     (read-data-lines (.reader rdr)
                      (fn [rdr] (parse-fn (read-data-line-buffer rdr)))))))
(defn- make-lazy-variants [f s]
  (when-first [fs s]
    (lazy-cat
     (f fs)
     (make-lazy-variants f (rest s)))))

Reads variants of the BCF file randomly using csi file. Returns them as a lazy sequence.

(defn read-variants-randomly
  [^BCFReader rdr
   {:keys [chr ^long start ^long end] :or {start 1 end 4294967296}}
   {:keys [depth] :or {depth :deep}}]
  (let [info (meta->map (:info (.meta-info rdr)))
        parse-fn  (make-parse-fn rdr info depth)
        input-stream ^BGZFInputStream (.reader rdr)
        chr-names (->> (.meta-info rdr) :contig (mapv :id))
        ref-idx (.indexOf ^clojure.lang.PersistentVector chr-names chr)
        csi-data @(.index-delay rdr)
        spans (when-not (neg? ref-idx)
                (util-bin/get-spans csi-data ref-idx start end))]
    (make-lazy-variants
     (fn [[chunk-beg ^long chunk-end]]
       (.seek input-stream chunk-beg)
       (->> #(when (< (.getFilePointer input-stream) chunk-end)
               (-> input-stream
                   read-data-line-buffer
                   parse-fn))
            repeatedly
            (take-while identity)
            (filter
             (fn [{chr' :chr :keys [^long pos info] ref' :ref}]
               (and (= chr' chr)
                    (<= pos end)
                    (<= start
                        (long (get info :END (dec (+ pos (count ref')))))))))))
     spans)))

Reads file offsets and a genomic position of variants from BCF and returns them as a lazy sequence. Each element is a map containing :chr, :chr-index, :beg, :end, :file-beg, :file-end.

(defn read-file-offsets
  [^BCFReader rdr]
  (let [^BGZFInputStream input-stream (.reader rdr)
        contigs (->> (:contig (.meta-info rdr))
                     (map-indexed (fn [index contig] [(:id contig) index]))
                     (into {}))
        parse-fn (make-parse-fn rdr (meta->map (:info (.meta-info rdr))) :shallow)]
    (letfn [(step [beg-pointer]
              (when (pos? (.available input-stream))
                (when-let [line (read-data-line-buffer input-stream)]
                  (let [end-pointer (.getFilePointer input-stream)
                        {:keys [chr ^long pos ^long rlen]} (parse-fn line)]
                    (cons {:file-beg beg-pointer, :file-end end-pointer
                           :chr-index (contigs chr), :beg pos, :chr chr,
                           :end (dec (+ pos rlen))}
                          (lazy-seq (step end-pointer)))))))]
      (step (.getFilePointer input-stream)))))