Skip to content

Commit

Permalink
Pull request #1: ECC-1415 Add a high-level interface
Browse files Browse the repository at this point in the history
Merge in ECCODES/eccodes-python from feature/highlevel to develop

* commit '142b2a19caa9ebe77d776aab20eccf15c22c1b9c': (29 commits)
  Allow requesting the keys as "name:type"
  Add more unit tests
  Add GRIBMessage.data
  Handle missing values as non-existent
  Rename Message.get_data to get_data_points
  Add tests for the high-level interface
  Black formatting and isort
  Add dict-like interface to Message
  Translate eccodes.KeyNotFoundError to KeyError
  Run flake8 (v3.9.2)
  Run isort (sort imports)
  Black formatting
  Support getting alternative key representations
  Split out GRIB-specific features into GRIBMessage
  Rename Message.handle to Message._handle
  Add Message.dump()
  Support arrays in Message.get
  Make Message.__del__ immune to teardown errors
  Add Message.__copy__
  Expose high-level interface classes in the eccodes package
  ...
  • Loading branch information
oiffrig authored and shahramn committed Jun 23, 2022
2 parents 9f285c2 + 142b2a1 commit 145976e
Show file tree
Hide file tree
Showing 5 changed files with 425 additions and 0 deletions.
1 change: 1 addition & 0 deletions eccodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
#

from .eccodes import * # noqa
from .highlevel import * # noqa
2 changes: 2 additions & 0 deletions eccodes/highlevel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .message import GRIBMessage, Message # noqa
from .reader import FileReader, MemoryReader, StreamReader # noqa
166 changes: 166 additions & 0 deletions eccodes/highlevel/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import io
from contextlib import contextmanager

import eccodes


_TYPES_MAP = {
"float": float,
"int": int,
"str": str,
}


@contextmanager
def raise_keyerror(name):
"""Make operations on a key raise a KeyError if not found"""
try:
yield
except eccodes.KeyValueNotFoundError:
raise KeyError(name)


class Message:
def __init__(self, handle):
self._handle = handle

def __del__(self):
try:
eccodes.codes_release(self._handle)
except Exception:
pass

def copy(self):
return Message(eccodes.codes_clone(self._handle))

def __copy__(self):
return self.copy()

def _get(self, name, ktype=None):
name, sep, stype = name.partition(":")
if sep and ktype is None:
try:
ktype = _TYPES_MAP[stype]
except KeyError:
raise ValueError(f"Unknown key type {stype!r}")
with raise_keyerror(name):
if eccodes.codes_is_missing(self._handle, name):
raise KeyError(name)
if eccodes.codes_get_size(self._handle, name) > 1:
return eccodes.codes_get_array(self._handle, name, ktype=ktype)
return eccodes.codes_get(self._handle, name, ktype=ktype)

def get(self, name, default=None, ktype=None):
try:
return self._get(name, ktype=ktype)
except KeyError:
return default

def set(self, name, value):
with raise_keyerror(name):
return eccodes.codes_set(self._handle, name, value)

def get_array(self, name):
with raise_keyerror(name):
return eccodes.codes_get_array(self._handle, name)

def get_size(self, name):
with raise_keyerror(name):
return eccodes.codes_get_size(self._handle, name)

def get_data_points(self):
raise NotImplementedError

def is_missing(self, name):
with raise_keyerror(name):
return bool(eccodes.codes_is_missing(self._handle, name))

def set_array(self, name, value):
with raise_keyerror(name):
return eccodes.codes_set_array(self._handle, name, value)

def set_missing(self, name):
with raise_keyerror(name):
return eccodes.codes_set_missing(self._handle, name)

def __getitem__(self, name):
return self._get(name)

def __setitem__(self, name, value):
self.set(name, value)

def __contains__(self, name):
return bool(eccodes.codes_is_defined(self._handle, name))

class _KeyIterator:
def __init__(self, message, namespace=None, iter_keys=True, iter_values=False):
self._message = message
self._iterator = eccodes.codes_keys_iterator_new(message._handle, namespace)
self._iter_keys = iter_keys
self._iter_values = iter_values

def __del__(self):
try:
eccodes.codes_keys_iterator_delete(self._iterator)
except Exception:
pass

def __iter__(self):
return self

def __next__(self):
while True:
if not eccodes.codes_keys_iterator_next(self._iterator):
raise StopIteration
if not self._iter_keys and not self._iter_values:
return
key = eccodes.codes_keys_iterator_get_name(self._iterator)
if self._message.is_missing(key):
continue
if self._iter_keys and not self._iter_values:
return key
value = self._message.get(key) if self._iter_values else None
if not self._iter_keys:
return value
return key, value

def __iter__(self):
return self._KeyIterator(self)

def keys(self, namespace=None):
return self._KeyIterator(self, namespace, iter_keys=True, iter_values=False)

def values(self, namespace=None):
return self._KeyIterator(self, namespace, iter_keys=False, iter_values=True)

def items(self, namespace=None):
return self._KeyIterator(self, namespace, iter_keys=True, iter_values=True)

def dump(self):
eccodes.codes_dump(self._handle)

def write_to(self, fileobj):
assert isinstance(fileobj, io.IOBase)
eccodes.codes_write(self._handle, fileobj)

def get_buffer(self):
return eccodes.codes_get_message(self._handle)


class GRIBMessage(Message):
def __init__(self, handle):
super().__init__(handle)
self._data = None

@property
def data(self):
if self._data is None:
self._data = self._get("values")
return self._data

def get_data_points(self):
return eccodes.codes_grib_get_data(self._handle)

@classmethod
def from_samples(cls, name):
return cls(eccodes.codes_grib_new_from_samples(name))
113 changes: 113 additions & 0 deletions eccodes/highlevel/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import eccodes
import gribapi
from gribapi import ffi

from .message import GRIBMessage


class ReaderBase:
def __init__(self):
self._peeked = None

def __iter__(self):
return self

def __next__(self):
if self._peeked is not None:
msg = self._peeked
self._peeked = None
return msg
handle = self._next_handle()
if handle is None:
raise StopIteration
return GRIBMessage(handle)

def _next_handle(self):
raise NotImplementedError

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def peek(self):
if self._peeked is None:
handle = self._next_handle()
if handle is not None:
self._peeked = GRIBMessage(handle)
return self._peeked


class FileReader(ReaderBase):
def __init__(self, path):
super().__init__()
self.file = open(path, "rb")

def _next_handle(self):
return eccodes.codes_new_from_file(self.file, eccodes.CODES_PRODUCT_GRIB)

def __enter__(self):
self.file.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
return self.file.__exit__(exc_type, exc_value, traceback)


class MemoryReader(ReaderBase):
def __init__(self, buf):
super().__init__()
self.buf = buf

def _next_handle(self):
if self.buf is None:
return None
handle = eccodes.codes_new_from_message(self.buf)
self.buf = None
return handle


@ffi.callback("long(*)(void*, void*, long)")
def pyread_callback(payload, buf, length):
stream = ffi.from_handle(payload)
read = stream.read(length)
n = len(read)
ffi.buffer(buf, length)[:n] = read
return n if n > 0 else -1 # -1 means EOF


cstd = ffi.dlopen(None)
ffi.cdef("void free(void* pointer);")
ffi.cdef(
"void* wmo_read_any_from_stream_malloc(void*, long (*stream_proc)(void*, void*, long), size_t*, int*);"
)


def codes_new_from_stream(stream):
sh = ffi.new_handle(stream)
length = ffi.new("size_t*")
err = ffi.new("int*")
err, buf = gribapi.err_last(gribapi.lib.wmo_read_any_from_stream_malloc)(
sh, pyread_callback, length
)
buf = ffi.gc(buf, cstd.free, size=length[0])
if err:
if err != gribapi.lib.GRIB_END_OF_FILE:
gribapi.GRIB_CHECK(err)
return None

# TODO: remove the extra copy?
handle = gribapi.lib.grib_handle_new_from_message_copy(ffi.NULL, buf, length[0])
if handle == ffi.NULL:
return None
else:
return gribapi.put_handle(handle)


class StreamReader(ReaderBase):
def __init__(self, stream):
self.stream = stream

def _next_handle(self):
return codes_new_from_stream(self.stream)
Loading

0 comments on commit 145976e

Please sign in to comment.