Skip to content

Reading from a stream

Ali Ghaffaari edited this page Apr 24, 2023 · 12 revisions

High-level method parse

Here is an example to read a file or stream containing multiple protobuf messages using parse method. In this example, the input file contains multiple Alignment messages defined in vg.proto. "Alignment" message type only serves as an example. The input file can contain any other protobuf messages of the same type which should be provided as an argument to parse method:

import stream
import vg_pb2  # or any other compiled protobuf module

# parse by file name
for message in stream.parse('test.gam', vg_pb2.Alignment):
   # work with `message`
   ...
# parse by file object
for message in stream.parse(f, vg_pb2.Alignment):
   # work with `message`
   ...

NOTE for vg users

The new version of vg stream library, now as a part of libvgio, writes a header tag at the start of the stream depending on the output format. For example, headers like b'GAM' or b'VG' can be found before the actual protobuf messages in GAM and VG files repectively. In order to make above examples work with the new version of stream, you should provide the expected value using header keyword argument; e.g. stream.parse('test.gam', vg_pb2.Alignment, header=b'GAM') for GAM files (introduced in pyStream v1.6.2).

Header tags might appear between protobuf messages in vg file format families and not just at the start of the stream; for example a VG file (which is a stream of vg_pb2.Graph messages) may look like:

┌───────┬───────┬─────────────────────────────────────┬───────┬─────┐
│ b'VG' │ count │ vg_pb2.Graph.SerializeToString()... │ b'VG' │ ... │
└───────┴───────┴─────────────────────────────────────┴───────┴─────┘

This kind of header tags is called persistent headers here. Since version v1.6.4, parse function expect such tags between vg chunks when option persistent_header=True is passed. For instance: stream.parse('test.vg', vg_pb2.Graph, header=b'VG', persistent_header=True).


Low-level method open

open method opens a stream and returns an iterable Stream object. It mimics built-in open method. Iterating over Stream object yields the message raw data without parsing. It can be useful when different types of messages are present in the file/stream. Otherwise, parse is recommended.

import stream
import vg_pb2  # or any other compiled protobuf module

# open by file name
with stream.open('test.gam', 'rb') as istream:
    for data in istream:
        message = vg_pb2.Alignment()
        message.ParseFromString(data)
        # work with message

# open by file object
# NOTE that file-like object `f` is passed as a keyword argument `fileobj`
with stream.open(fileobj=f, 'rb') as istream:
    for data in istream:
        message = vg_pb2.Alignment()
        message.ParseFromString(data)
        # work with message

Like parse, the open function also accepts header argument in case the input file contains a header.


NOTE

The stream can be closed by calling close method explicitly, especially when Stream is opened without using context management (with statement).


Header

Since v1.6.2, a stream can have a header at its start. A header is a byte string and can be verified by providing the expected value using header argument to any API functions (both parse and open functions or when a Stream is instantiated). If the fetched header is different from the expected value, an exception is raised. Byte string headers can be fetched using low-level API methods (i.e. open) in case they are unknown at the time of reading.

If the same header occurs between group of messages rather than just at the start (which is called persistent header), set persistent_header option to True when calling API functions (since v1.6.4).

Optional GZip compression

The streams encoded by Stream library is GZip compressed. The compression can be disabled by passing gzip=False when opening a stream (works for both parse and open methods).

Group delimiter

Group of objects can be separated by a delimiter of the choice (or by default None) when reading from a stream. Sometimes, it can help to identify the end of a group which is transparent for the library user by default. This feature can be enable by setting group_delimiter to True when constructing a Stream instance or opening a stream. The delimiter class can also be specified by delimiter_cls.

Clone this wiki locally