Skip to content

Commit

Permalink
allow LZW CRINEX, use magic number for loader
Browse files Browse the repository at this point in the history
  • Loading branch information
scivision committed Nov 15, 2023
1 parent 3186b59 commit ab46fc3
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions src/georinex/rio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,41 @@ def opener(fn: T.TextIO | Path, header: bool = False) -> T.Iterator[T.TextIO]:
if finf.st_size > 100e6:
logging.info(f"opening {finf.st_size/1e6} MByte {fn.name}")

# %% get magic number
"""https://en.wikipedia.org/wiki/List_of_file_signatures"""
with fn.open("rb") as fid:
magic = fid.read(4)

suffix = fn.suffix.lower()

if suffix == ".gz":
if suffix == ".gz" or magic.startswith(b"\x1f\x8b"):
with gzip.open(fn, "rt") as f:
_, is_crinex = rinex_version(first_nonblank_line(f))
f.seek(0)

if is_crinex and not header:
# Conversion to string is necessary because of a quirk where gzip.open()
# even with 'rt' doesn't decompress until read.
"""
gzip compressed CRINEX
Conversion to string is necessary because of a quirk where gzip.open()
even with 'rt' doesn't decompress until read.
"""
f = io.StringIO(crx2rnx(f.read()))
yield f
elif suffix == ".bz2":
# this is for plain bzip2 files, NOT tar.bz2, which requires f.seek(512)
elif suffix == ".bz2" or magic.startswith(b"\x42\x5a\x68"):
"""
plain bzip2 files, NOT tar.bz2, which requires f.seek(512)
"""
with bz2.open(fn, "rt") as f:
_, is_crinex = rinex_version(first_nonblank_line(f))
f.seek(0)

if is_crinex and not header:
"""
bzip2 compressed CRINEX
"""
f = io.StringIO(crx2rnx(f.read()))
yield f
elif suffix == ".zip":
elif suffix == ".zip" or magic.startswith(b"\x50\x4b"):
with zipfile.ZipFile(fn, "r") as z:
flist = z.namelist()
for rinexfn in flist:
Expand All @@ -62,9 +75,17 @@ def opener(fn: T.TextIO | Path, header: bool = False) -> T.Iterator[T.TextIO]:
io.TextIOWrapper(bf, encoding="ascii", errors="ignore").read() # type: ignore
)
yield f
elif suffix == ".z":
elif suffix == ".z" or magic.startswith(b"\x1f\x9d"):
with fn.open("rb") as zu:
with io.StringIO(unlzw(zu.read()).decode("ascii")) as f:
_, is_crinex = rinex_version(first_nonblank_line(f))
f.seek(0)

if is_crinex and not header:
"""
LZW compressed CRINEX
"""
f = io.StringIO(crx2rnx(f.read()))
yield f
else: # assume not compressed (or Hatanaka)
with fn.open("r", encoding="ascii", errors="ignore") as f:
Expand Down

0 comments on commit ab46fc3

Please sign in to comment.