SUP-1861: Add high-level GRIB interface

This commit is contained in:
Daniel Lee 2016-12-14 10:53:20 +01:00
parent 04c511fafa
commit ef2eddf009
10 changed files with 545 additions and 1 deletions

View File

@ -22,6 +22,7 @@ index.grib
grid_ieee.grib grid_ieee.grib
jpeg.grib2 jpeg.grib2
latlon.grib latlon.grib
high_level_api.grib2
lfpw.grib1 lfpw.grib1
missing_field.grib1 missing_field.grib1
missing.grib2 missing.grib2

View File

@ -60,6 +60,14 @@ list( APPEND tests
gts_get_keys gts_get_keys
metar_get_keys metar_get_keys
) )
# The high level python test requires new features in the unittest
# which are only there for python 2.7 onwards
if( PYTHON_VERSION_STRING VERSION_GREATER "2.7" )
ecbuild_info("Python examples: Adding test for PythonicGrib")
list( APPEND tests high_level_api )
endif()
foreach( test ${tests} ) foreach( test ${tests} )
ecbuild_add_test( TARGET eccodes_p_${test}_test ecbuild_add_test( TARGET eccodes_p_${test}_test
TYPE SCRIPT TYPE SCRIPT

View File

@ -0,0 +1,153 @@
#!/bin/env python
"""
Unit tests for ``PythonicGrib``.
Author: Daniel Lee, DWD, 2014
"""
import os
from tempfile import NamedTemporaryFile
import unittest
from eccodes import GribFile
from eccodes import GribIndex
from eccodes import GribMessage
from eccodes.high_level.gribmessage import IndexNotSelectedError
TESTGRIB = "../../data/high_level_api.grib2"
TEST_OUTPUT = "test-output.grib"
TEST_INDEX = "test.index"
TEST_KEYS = ("dataDate", "stepRange")
TEST_VALUES = 20110225, 0
SELECTION_DICTIONARY = {}
for i1 in range(len(TEST_KEYS)):
SELECTION_DICTIONARY[TEST_KEYS[i1]] = TEST_VALUES[i1]
TEST_INDEX_OUTPUT = TESTGRIB
TEST_STEPRANGE = ('0', '12', '18', '24', '6')
class TestGribFile(unittest.TestCase):
"""Test GribFile functionality."""
def test_memory_management(self):
"""Messages in GribFile can be opened and closed properly."""
with GribFile(TESTGRIB) as grib:
self.assertEqual(len(grib), 5)
for i in range(len(grib)):
msg = GribMessage(grib)
self.assertEqual(msg["shortName"], "msl")
self.assertEqual(len(grib.open_messages), 5)
self.assertEqual(len(grib.open_messages), 0)
def test_iteration_works(self):
"""The GribFile allows proper iteration over all messages."""
step_ranges = []
with GribFile(TESTGRIB) as grib:
for _ in range(len(grib)):
msg = GribMessage(grib)
step_ranges.append(msg["stepRange"])
self.assertSequenceEqual(step_ranges, ["0", "6", "12", "18", "24"])
def test_iterator_protocol(self):
"""The GribFile allows pythonic iteration over all messages."""
step_ranges = []
with GribFile(TESTGRIB) as grib:
for msg in grib:
step_ranges.append(msg["stepRange"])
self.assertSequenceEqual(step_ranges, ["0", "6", "12", "18", "24"])
def test_read_past_last_message(self):
"""Trying to open message on exhausted GRIB file raises IOError."""
with GribFile(TESTGRIB) as grib:
for _ in range(len(grib)):
GribMessage(grib)
self.assertRaises(IOError, lambda: GribMessage(grib))
def test_read_invalid_file(self):
"""Trying to open message on nonexistent GRIB file raises IOError."""
with NamedTemporaryFile(mode='r') as f:
with GribFile(f.name) as grib:
self.assertRaises(IOError, lambda: GribMessage(grib))
class TestGribMessage(unittest.TestCase):
"""Test GribMessage functionality."""
def test_metadata(self):
"""Metadata is read correctly from GribMessage."""
with GribFile(TESTGRIB) as grib:
msg = GribMessage(grib)
key_count = 251
self.assertEqual(len(msg), key_count)
self.assertEqual(msg.size(), 160219)
self.assertEqual(len(msg.keys()), key_count)
def test_missing_message_behavior(self):
"""Missing messages are detected properly."""
with GribFile(TESTGRIB) as grib:
msg = GribMessage(grib)
self.assertTrue(msg.missing("scaleFactorOfSecondFixedSurface"))
msg["scaleFactorOfSecondFixedSurface"] = 5
msg.set_missing("scaleFactorOfSecondFixedSurface")
with self.assertRaises(KeyError):
msg["scaleFactorOfSecondFixedSurface"]
def test_value_setting(self):
"""Keys can be set properly."""
with GribFile(TESTGRIB) as grib:
msg = GribMessage(grib)
msg["scaleFactorOfSecondFixedSurface"] = 5
msg["values"] = [1, 2, 3]
def test_serialize(self):
"""Message can be serialized to file."""
with GribFile(TESTGRIB) as grib:
msg = GribMessage(grib)
with open(TEST_OUTPUT, "w") as test:
msg.write(test)
os.unlink(TEST_OUTPUT)
def test_clone(self):
"""Messages can be used to produce clone Messages."""
with GribFile(TESTGRIB) as grib:
msg = GribMessage(grib)
msg2 = GribMessage(clone=msg)
self.assertSequenceEqual(msg.keys(), msg2.keys())
class TestGribIndex(unittest.TestCase):
"""Test GribIndex functionality."""
def test_memory_management(self):
"""GribIndex closes GribMessages properly."""
with GribIndex(TESTGRIB, TEST_KEYS) as idx:
idx.select(SELECTION_DICTIONARY)
self.assertEqual(len(idx.open_messages), 1)
self.assertEqual(len(idx.open_messages), 0)
def test_create_and_serialize_index(self):
"""GribIndex can be saved to file, file can be added to index."""
with GribIndex(TESTGRIB, TEST_KEYS) as idx:
idx.write(TEST_INDEX)
with GribIndex(file_index=TEST_INDEX) as idx:
idx.add(TESTGRIB)
os.unlink(TEST_INDEX)
# TODO: Following test disabled due to error message:
# GRIB_API ERROR: please select a value for index key "dataDate"
# Must investigate
#
def _test_index_comprehension(self):
"""GribIndex understands underlying GRIB index properly."""
with GribIndex(TESTGRIB, TEST_KEYS) as idx:
self.assertEqual(idx.size(TEST_KEYS[1]), 5)
self.assertSequenceEqual(idx.values(TEST_KEYS[1]), TEST_STEPRANGE)
with self.assertRaises(IndexNotSelectedError):
idx.select({TEST_KEYS[1]: TEST_VALUES[0]})
idx.select(SELECTION_DICTIONARY)
self.assertEqual(len(idx.open_messages), 1)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,10 @@
#!/bin/sh
. ./include.sh
# To get verbose output
#$PYTHON -m unittest -v high_level_api
$PYTHON $examples_src/high_level_api.py
rm -f test.index

View File

@ -1,2 +1,8 @@
from __future__ import absolute_import
from .eccodes import * from .eccodes import *
from .eccodes import __version__ from .eccodes import __version__
from .high_level.gribfile import GribFile
from .high_level.gribmessage import GribMessage
from .high_level.gribindex import GribIndex

View File

View File

@ -0,0 +1,65 @@
"""
``GribFile`` class that implements a GRIB file that closes itself and its
messages when it is no longer needed.
Author: Daniel Lee, DWD, 2014
"""
import eccodes
from .gribmessage import GribMessage
class GribFile(file):
"""
A GRIB file handle meant for use in a context manager.
Individual messages can be accessed using the ``next`` method. Of course,
it is also possible to iterate over each message in the file::
>>> with GribFile(filename) as grib:
... # Print number of messages in file
... len(grib)
... # Open all messages in file
... for msg in grib:
... print(msg["shortName"])
... len(grib.open_messages)
>>> # When the file is closed, any open messages are closed
>>> len(grib.open_messages)
"""
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
"""Close all open messages, release GRIB file handle and close file."""
while self.open_messages:
self.open_messages.pop().close()
self.file_handle.close()
def close(self):
"""Possibility to manually close file."""
self.__exit__(None, None, None)
def __len__(self):
"""Return total messages in GRIB file."""
return eccodes.codes_count_in_file(self.file_handle)
def __iter__(self):
return self
def next(self):
try:
return GribMessage(self)
except IOError:
raise StopIteration()
def __init__(self, filename, mode="r"):
"""Open file and receive GRIB file handle."""
#: File handle for working with actual file on disc
#: The class holds the file it works with because the GRIB API's
#: typechecking does not allow using inherited classes.
self.file_handle = open(filename, mode)
#: Number of message in GRIB file currently being read
self.message = 0
#: Open messages
self.open_messages = []

View File

@ -0,0 +1,103 @@
"""
``GribIndex`` class that implements a GRIB index that allows access to
the GRIB API's index functionality.
Author: Daniel Lee, DWD, 2014
"""
import eccodes
from .gribmessage import GribMessage
class GribIndex(object):
"""
A GRIB index meant for use in a context manager.
Usage::
>>> # Create index from file with keys
>>> with GribIndex(filename, keys) as idx:
... # Write index to file
... idx.write(index_file)
>>> # Read index from file
>>> with GribIndex(file_index=index_file) as idx:
... # Add new file to index
... idx.add(other_filename)
... # Report number of unique values for given key
... idx.size(key)
... # Report unique values indexed by key
... idx.values(key)
... # Request GribMessage matching key, value
... msg = idx.select({key: value})
"""
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
"""Release GRIB message handle and inform file of release."""
while self.open_messages:
self.open_messages[0].close()
eccodes.codes_index_release(self.iid)
def close(self):
"""Possibility to manually close index."""
self.__exit__(None, None, None)
def __init__(self, filename=None, keys=None, file_index=None,
grib_index=None):
"""
Create new GRIB index over ``keys`` from ``filename``.
``filename`` should be a string of the desired file's filename.
``keys`` should be a sequence of keys to index. ``file_index`` should
be a string of the file that the index should be loaded from.
``grib_index`` should be another ``GribIndex``.
If ``filename`` and ``keys`` are provided, the ``GribIndex`` is
initialized over the given keys from the given file. If they are not
provided, the ``GribIndex`` is read from ``indexfile``. If
``grib_index`` is provided, it is cloned from the given ``GribIndex``.
"""
#: Grib index ID
self.iid = None
if filename and keys:
self.iid = eccodes.codes_index_new_from_file(filename, keys)
elif file_index:
self.iid = eccodes.codes_index_read(file_index)
elif grib_index:
self.iid = eccodes.codes_new_from_index(grib_index.iid)
else:
raise RuntimeError("No source was supplied "
"(possibilities: grib_file, clone, sample).")
#: Indexed keys. Only available if GRIB is initialized from file.
self.keys = keys
#: Open GRIB messages
self.open_messages = []
def size(self, key):
"""Return number of distinct values for index key."""
return eccodes.codes_index_get_size(self.iid, key)
def values(self, key, type=str):
"""Return distinct values of index key."""
return eccodes.codes_index_get(self.iid, key, type)
def add(self, filename):
"""Add ``filename`` to the ``GribIndex``."""
eccodes.codes_index_add_file(self.iid, filename)
def write(self, outfile):
"""Write index to filename at ``outfile``."""
eccodes.codes_index_write(self.iid, outfile)
def select(self, key_value_pairs):
"""
Return message associated with given key value pairs.
``key_value_pairs`` should be passed as a dictionary.
"""
for key in key_value_pairs:
eccodes.codes_index_select(self.iid, key, key_value_pairs[key])
return GribMessage(gribindex=self)

View File

@ -0,0 +1,198 @@
"""
``GribMessage`` class that implements a GRIB message that allows access to
the message's key-value pairs in a dictionary-like manner and closes the
message when it is no longer needed, coordinating this with its host file.
Author: Daniel Lee, DWD, 2014
"""
import collections
import eccodes
class IndexNotSelectedError(Exception):
"""GRIB index was requested before selecting key/value pairs."""
class GribMessage(object):
"""
A GRIB message.
Each ``GribMessage`` is stored as a key/value pair in a dictionary-like
structure. It can be used in a context manager or by itself. When the
``GribFile`` it belongs to is closed, it closes any open ``GribMessage``s
that belong to it. If a ``GribMessage`` is closed before its ``GribFile``
is closed, it informs the ``GribFile`` of its closure.
Scalar and vector values are set appropriately through the same method.
Usage::
>>> with GribFile(filename) as grib:
... # Access shortNames of all messages
... for msg in grib:
... print(msg["shortName"])
... # Report number of keys in message
... len(msg)
... # Report message size in bytes
... msg.size
... # Report keys in message
... msg.keys()
... # Check if value is missing
... msg.missing("scaleFactorOfSecondFixedSurface")
... # Set scalar value
... msg["scaleFactorOfSecondFixedSurface"] = 5
... # Check key's value
... msg["scaleFactorOfSecondFixedSurface"]
... # Set value to missing
... msg.set_missing("scaleFactorOfSecondFixedSurface")
... # Missing values raise exception when read with dict notation
... msg["scaleFactorOfSecondFixedSurface"]
... # Array values are set transparently
... msg["values"] = [1, 2, 3]
... # Messages can be written to file
... with open(testfile, "w") as test:
... msg.write(test)
... # Messages can be cloned from other messages
... msg2 = GribMessage(clone=msg)
... # If desired, messages can be closed manually or used in with
... msg.close()
"""
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
"""Release GRIB message handle and inform file of release."""
# This assert should never trigger
# assert self.gid is not None
eccodes.codes_release(self.gid)
if self.grib_index:
self.grib_index.open_messages.remove(self)
def close(self):
"""Possibility to manually close message."""
self.__exit__(None, None, None)
def __contains__(self, key):
"""Check whether a key is present in message."""
return key in self.keys()
def __len__(self):
"""Return key count."""
return len(self.keys())
def __getitem__(self, key):
"""Return value associated with key as its native type."""
return self.get(key)
def __setitem__(self, key, value):
"""
Set value associated with key.
If the object is iterable,
"""
# Alternative implemented (TODO: evaluate)
# if eccodes.codes_get_size(self.gid, key) > 1:
# eccodes.codes_set_array(self.gid, key, value)
# else:
# eccodes.codes_set(self.gid, key, value)
# Passed value is iterable and not string
if (isinstance(value, collections.Iterable) and not
isinstance(value, basestring)):
eccodes.codes_set_array(self.gid, key, value)
else:
eccodes.codes_set(self.gid, key, value)
def __iter__(self):
return iter(self.keys())
# Not yet implemented
# def itervalues(self):
# return self.values()
def items(self):
"""Return list of tuples of all key/value pairs."""
return [(key, self[key]) for key in self.keys()]
def keys(self, namespace=None):
"""Get available keys in message."""
iterator = eccodes.codes_keys_iterator_new(self.gid, namespace=namespace)
keys = []
while eccodes.codes_keys_iterator_next(iterator):
key = eccodes.codes_keys_iterator_get_name(iterator)
keys.append(key)
eccodes.codes_keys_iterator_delete(iterator)
return keys
def __init__(self, grib_file=None, clone=None, sample=None, gribindex=None):
"""
Open a message and inform the GRIB file that it's been incremented.
If ``grib_file`` is not supplied, the message is cloned from
``GribMessage`` ``clone``. If neither is supplied, the ``GribMessage``
is cloned from ``sample``. If ``index`` is supplied as a GribIndex, the
message is taken from the index.
"""
#: Unique GRIB ID, for GRIB API interface
self.gid = None
#: File containing message
self.grib_file = None
#: GribIndex referencing message
self.grib_index = None
if grib_file is not None:
self.gid = eccodes.codes_grib_new_from_file(grib_file.file_handle)
if self.gid is None:
raise IOError("Grib file %s is exhausted" % grib_file.name)
self.grib_file = grib_file
self.grib_file.message += 1
self.grib_file.open_messages.append(self)
elif clone is not None:
self.gid = eccodes.codes_clone(clone.gid)
elif sample is not None:
self.gid = eccodes.codes_grib_new_from_samples(sample)
elif gribindex is not None:
self.gid = eccodes.codes_new_from_index(gribindex.iid)
if not self.gid:
raise IndexNotSelectedError("All keys must have selected "
"values before receiving message "
"from index.")
self.grib_index = gribindex
gribindex.open_messages.append(self)
else:
raise RuntimeError("Either grib_file, clone, sample or gribindex "
"must be provided.")
def size(self):
"""Return size of message in bytes."""
return eccodes.codes_get_message_size(self.gid)
def dump(self):
"""Dump message's binary content."""
return eccodes.codes_get_message(self.gid)
def get(self, key, ktype=None):
"""Get value of a given key as its native or specified type."""
if self.missing(key):
raise KeyError("Key is missing from message.")
if eccodes.codes_get_size(self.gid, key) > 1:
ret = eccodes.codes_get_array(self.gid, key, ktype)
else:
ret = eccodes.codes_get(self.gid, key, ktype)
return ret
def missing(self, key):
"""Report if key is missing."""
return bool(eccodes.codes_is_missing(self.gid, key))
def set_missing(self, key):
"""Set a key to missing."""
eccodes.codes_set_missing(self.gid, key)
def write(self, outfile=None):
"""Write message to file."""
if not outfile:
# This is a hack because the API does not accept inheritance
outfile = self.grib_file.file_handle
eccodes.codes_write(self.gid, outfile)

View File

@ -64,4 +64,4 @@ setup(name='eccodes',
url='https://software.ecmwf.int/wiki/display/ECC/ecCodes+Home', url='https://software.ecmwf.int/wiki/display/ECC/ecCodes+Home',
download_url='https://software.ecmwf.int/wiki/display/ECC/Releases', download_url='https://software.ecmwf.int/wiki/display/ECC/Releases',
ext_modules=[Extension('gribapi._gribapi_swig', **attdict)], ext_modules=[Extension('gribapi._gribapi_swig', **attdict)],
packages=['eccodes', 'gribapi']) packages=['eccodes', 'eccodes.high_level', 'gribapi'])