eccodes/examples/python/prepBufr_create_defs.py

200 lines
6.7 KiB
Python
Raw Normal View History

# (C) Copyright 2005- ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
#
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.
import traceback
import sys
import os
import re
import getopt
from eccodes import *
VERBOSE = 1 # verbose error reporting
DEBUG = 0
def usage():
progname = os.path.basename(sys.argv[0])
print("Usage: ", progname, "[options] bufr_file")
print("Options:")
print("\t-e generate the file defining Table B descriptors (element.table)")
print("\t-s generate the file defining Table D descriptors (sequence.def)")
print("")
def infer_type(units, scale):
ktype = "double"
if re.search("^CCITT", units) or re.search("^CHARACTER", units):
return "string"
if re.search("CODE\s*TABLE", units):
return "table"
if re.search("FLAG\s*TABLE", units):
return "flag"
if re.search("NUMERIC", units) or int(scale) <= 0:
return "long"
return ktype
def generate_tables(INPUT, what):
assert what in ("element", "sequence")
f = open(INPUT, "rb")
cnt = 0
if what == "element":
print("#code|abbreviation|type|name|unit|scale|reference|width")
# loop for the messages in the file
while 1:
# get handle for message
bufr = codes_bufr_new_from_file(f)
if bufr is None:
break
# Sanity check
if codes_get(bufr, "dataCategory") != 11:
# print('BUFR message dataCategory must be 11 (BUFR tables). Ignoring')
codes_release(bufr)
continue
if codes_get(bufr, "numberOfSubsets") == 0:
# print('BUFR message number of subsets == 0. Ignoring')
codes_release(bufr)
continue
if DEBUG:
print(f"Processing message {cnt+1}")
codes_set(bufr, "unpack", 1)
# Each table message contains three delayed replications (031001):
# First is for Table A (skipped), second for Table B and third for Table D
replications = codes_get_array(bufr, "delayedDescriptorReplicationFactor")
fDesc = codes_get_array(bufr, "fDescriptorToBeAddedOrDefined")
xDesc = codes_get_array(bufr, "xDescriptorToBeAddedOrDefined")
yDesc = codes_get_array(bufr, "yDescriptorToBeAddedOrDefined")
assert len(fDesc) == len(xDesc)
assert len(fDesc) == len(yDesc)
assert len(replications) > 0
assert len(fDesc) == replications[1] + replications[2]
if what == "element":
try:
elementName1 = codes_get_array(bufr, "elementNameLine1")
elementName2 = codes_get_array(bufr, "elementNameLine2")
unitsName = codes_get_array(bufr, "unitsName")
unitsScaleSign = codes_get_array(bufr, "unitsScaleSign")
unitsScale = codes_get_array(bufr, "unitsScale")
unitsReferenceSign = codes_get_array(bufr, "unitsReferenceSign")
unitsReferenceValue = codes_get_array(bufr, "unitsReferenceValue")
elementDataWidth = codes_get_array(bufr, "elementDataWidth")
except CodesInternalError as err:
if DEBUG:
print("No element descriptors found")
pass
for i in range(len(fDesc)):
fd = fDesc[i]
xd = xDesc[i]
yd = yDesc[i]
if fd == "0":
# ecCodes key (abbreviation) => first word of elementNameLine1
el1 = elementName1[i].rstrip()
key = str.split(el1)[0]
# description (name) => Combine rest of elementNameLine1 and elementNameLine2
desc = " ".join(str.split(el1)[1:]) + elementName2[i].rstrip()
if len(desc) == 0:
desc = key
units = unitsName[i].rstrip()
sign = unitsScaleSign[i]
scale = int(unitsScale[i])
if sign == "-":
scale = -1 * scale
sign = unitsReferenceSign[i]
reference = int(unitsReferenceValue[i])
if sign == "-":
reference = -1 * reference
width = elementDataWidth[i].rstrip()
ktype = infer_type(units, scale)
print(
f"{fd}{xd}{yd}|{key}|{ktype}|{desc}|{units}|{scale}|{reference}|{width}"
)
if what == "sequence":
try:
descriptorDefiningSequence = codes_get_array(
bufr, "descriptorDefiningSequence"
)
except CodesInternalError as err:
if DEBUG:
print("No sequence descriptors found")
pass
k = 0
rindex = 3 # From index 3 onwards we have the contents of sequences
for i in range(len(fDesc)):
fd = fDesc[i]
xd = xDesc[i]
yd = yDesc[i]
if fd == "3":
repls = replications[rindex]
if repls:
print(f'"{fd}{xd}{yd}" = [ ', end="")
comma = ","
for j in range(repls):
seq = descriptorDefiningSequence[k + j]
if j == repls - 1:
comma = ""
print(f"{seq}{comma} ", end="")
k = k + repls
rindex = rindex + 1
print("]")
cnt += 1
codes_release(bufr)
f.close()
def main():
if len(sys.argv) < 2:
usage()
return 1
try:
what = "element"
options = "es"
opts, args = getopt.getopt(sys.argv[1:], options)
for o, a in opts:
if o == "-e":
what = "element"
elif o == "-s":
what = "sequence"
else:
assert False, "Invalid option"
# Check we have a BUFR file to process
if not args:
usage()
return 1
for arg in args:
generate_tables(arg, what)
except getopt.GetoptError as err:
print("Error: ", err)
usage()
return 1
except CodesInternalError as err:
if VERBOSE:
traceback.print_exc(file=sys.stderr)
else:
sys.stderr.write(err.msg + "\n")
return 1
if __name__ == "__main__":
sys.exit(main())