Source code for modos.genomics.htsget

"""htsget client implementation

The htsget protocol [1]_ allows to stream slices of genomic data from a remote server.
The client is implemented as a file-like interface that lazily streams chunks from the server.

In practice, the client sends a request for a file with a specific format and genomic region.
The htsget server finds the byte ranges on the data server (e.g. S3) corresponding to the requests
and responds with a "ticket".

The ticket is a json document containing a list of blocks; each having headers and a URL pointing to_file
the corresponding byte ranges on the data server.

The client then streams data from these URLs, effectively concatenating the blocks into a single stream.


.. figure:: http://samtools.github.io/hts-specs/pub/htsget-ticket.png
   :width: 66%
   :alt: htsget mechanism diagram

   Illustration of the mechanism through which the htsget server allows streaming and random-access on genomic files. See [1]_ for more details.


Notes
-----

This implementation differs from the reference GA4GH implementation [2]_ in that it allows lazily consuming chunks from a file-like interface without saving to a file. A downside of this approach is that the client cannot seek.

Additionally, this implementation does not support asynchronous fetching of blocks, which means that blocks are fetched sequentially.

References
----------

.. [1] http://samtools.github.io/hts-specs/htsget.html
.. [2] https://github.com/ga4gh/htsget
"""

import base64
from collections import deque
from functools import cached_property
import io
from pathlib import Path
import re
import tempfile
from typing import Optional, Iterator
from urllib.parse import urlparse, parse_qs

from pydantic import HttpUrl, validate_call
from pydantic.dataclasses import dataclass
import pysam
import requests

from modos.genomics.region import Region
from modos.genomics.formats import GenomicFileSuffix, read_pysam


@validate_call
[docs] def build_htsget_url( host: HttpUrl, path: Path, region: Optional[Region] ) -> str: """Build an htsget URL from a host, path, and region. Examples -------- >>> build_htsget_url( ... "http://localhost:8000", ... Path("file.bam"), ... Region("chr1", 0, 1000) ... ) 'http://localhost:8000/reads/file?format=BAM&referenceName=chr1&start=0&end=1000' """ format = GenomicFileSuffix.from_path(path) endpoint = format.to_htsget_endpoint() # remove .gz suffix if present stem = path.with_suffix("") if path.name.endswith("gz") else path stem = stem.with_suffix("") netloc = host if str(host).endswith("/") else f"{host}/" url = f"{netloc}{endpoint}/{stem}?format={format.name}" if region: url += f"&{region.to_htsget_query()}" return url
@validate_call
[docs] def parse_htsget_url(url: HttpUrl) -> tuple[str, Path, Optional[Region]]: """Given a URL to an htsget resource, extract the host, path, and region.""" parsed = urlparse(str(url)) query = parse_qs(parsed.query) if "format" not in query: raise ValueError("Missing format in htsget URL") format = GenomicFileSuffix[query["format"][0]] endpoint = format.to_htsget_endpoint() pre_endpoint = re.sub(rf"/{endpoint}.*", r"", parsed.path) host = f"{parsed.scheme}://{parsed.netloc}{pre_endpoint}" path = Path(re.sub(rf"^.*{endpoint}", r"", parsed.path)).with_suffix( f".{format.name.lower()}" ) try: region = Region.from_htsget_query(str(url)) except KeyError: region = None return (host, path, region)
[docs] class _HtsgetBlockIter: """Transparent iterator over blocks of an htsget stream. This is used internally by HtsgetStream to lazily fetch and concatenate blocks. Examples -------- >>> next(_HtsgetBlockIter([ ... {"url": "data:;base64,MTIzNDU2Nzg5"}, ... {"url": "data:;base64,MTIzNDU2Nzg5"}, ... ])) b'123456789' """ def __init__(self, blocks: list[dict], chunk_size=65536, timeout=60): # the queue of block is consumed in order of appearance
[docs] self._blocks = deque(blocks)
[docs] self._source = self._consume_block()
[docs] self.chunk_size = chunk_size
[docs] self.timeout = timeout
[docs] def __iter__(self): return self
[docs] def _consume_block(self) -> Iterator[bytes]: """Get streaming iterator over current block.""" curr_block = self._blocks.popleft() parsed = urlparse(curr_block["url"]) match parsed.scheme: # http url -> fetch from data server case "http" | "https": chunks = requests.get( curr_block["url"], headers=curr_block.get("headers"), stream=True, timeout=self.timeout, ).iter_content(chunk_size=self.chunk_size) for chunk in chunks: yield chunk # data uri -> content directly in ticket case "data": split = parsed.path.split(",", 1) data = base64.b64decode(split[1]) yield data case _: raise ValueError(f"Unsupported scheme: {parsed.scheme}")
[docs] def __next__(self) -> bytes: """ Stream next chunk of current block, or first chunk of next block. """ # Iterate over current block try: return next(self._source) # End of current block except StopIteration: # remaining blocks -> move to next block try: self._source = self._consume_block() return self.__next__() # last block -> end of stream except IndexError: raise StopIteration
[docs] class HtsgetStream(io.RawIOBase): """A file-like handle to a read-only, buffered htsget stream. Examples -------- >>> stream = HtsgetStream([ ... {"url": "data:;base64,MTIzNDU2Nzg5Cg=="}, ... {"url": "data:;base64,MTIzNDU2Nzg5Cg=="}, ... ]) >>> stream.read(4) b'1234' """ def __init__(self, blocks: list[dict]):
[docs] self._iterator = _HtsgetBlockIter(blocks)
[docs] self._leftover = b""
[docs] def readable(self) -> bool: return True
[docs] def readinto(self, b) -> int: """ Read up to len(b) bytes into a writable buffer bytes and return the number of bytes read. Notes ----- See https://docs.python.org/3/library/io.html#io.RawIOBase.readinto """ try: l = len(b) # We return at most this much while True: chunk = self._leftover or next(self._iterator) # skip empty elements if not chunk: continue # fill buffer and keep any leftover for next chunk output, self._leftover = chunk[:l], chunk[l:] b[: len(output)] = output return len(output) except StopIteration: return 0 # indicate EOF
@dataclass
[docs] class HtsgetConnection: """Connection to an htsget resource. It allows to open a stream to the resource and lazily fetch data from it. """
[docs] host: HttpUrl
[docs] path: Path
[docs] region: Optional[Region]
@property
[docs] def url(self) -> str: """URL to fetch the ticket.""" return build_htsget_url(self.host, Path(self.path), self.region)
@cached_property
[docs] def ticket(self) -> dict: """Ticket containing the URLs to fetch the data.""" return requests.get(self.url).json()
[docs] def open(self) -> io.RawIOBase: """Open a connection to the stream data.""" try: return HtsgetStream(self.ticket["htsget"]["urls"]) except KeyError: raise KeyError(f"No htsget urls found in ticket: {self.ticket}")
[docs] def to_file(self, path: Path): """Save all data from the stream to a file.""" with self.open() as source, open(path, "wb") as sink: for block in source: sink.write(block)
@classmethod
[docs] def from_url(cls, url: str): """Open connection directly from an htsget URL.""" host, path, region = parse_htsget_url(url) return cls(host, path, region=region)
[docs] def to_pysam( self, reference_filename: Optional[str] = None ) -> Iterator[pysam.AlignedSegment | pysam.VariantRecord]: """Convert the stream to a pysam object.""" # NOTE: pysam needs a path or file descriptor, # we have to stream from drive until this is addressed: # ref: https://github.com/pysam-developers/pysam/blob/0787ca9da997b5911c00fd12584dad9741c82fb4/pysam/libcalignmentfile.pyx#L855 # TODO: when above addressed, replace temporary file with # self.open() to stream directly from in-memory buffer. buffer = tempfile.NamedTemporaryFile( "w+b", delete=False, suffix=self.path.suffix ).name self.to_file(Path(buffer)) buffer = read_pysam( Path(buffer), reference_filename=reference_filename ) for record in buffer: if self.region is None: yield record continue # htsget includes all returns in the bgzf block # we filter out records outside requested region record_region = Region.from_pysam(record) if not record_region.overlaps(self.region): continue yield record