From 029ac17d6ec9b6a9d5225d18046cd8c4eef3e129 Mon Sep 17 00:00:00 2001 From: Shahram Najm Date: Fri, 20 Sep 2019 17:59:16 +0100 Subject: [PATCH] Performance: BUFR decoding: provide fast extraction of header data --- src/eccodes.h | 9 +- src/grib_api.h | 54 +++++++++ src/grib_io.c | 300 ++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 358 insertions(+), 5 deletions(-) diff --git a/src/eccodes.h b/src/eccodes.h index dac623d81..beb43c15b 100644 --- a/src/eccodes.h +++ b/src/eccodes.h @@ -1262,10 +1262,15 @@ codes_handle *codes_grib_util_set_spec(codes_handle *h, size_t data_values_count, int *err); +/* Build an array of headers from input BUFR file. + * result = array of 'codes_bufr_header' structs with 'num_messages' elements. + * This array should be freed by the caller. + * num_messages = number of messages found in the input file. + * returns 0 if OK, integer value on error. + */ +int codes_bufr_extract_headers_malloc(codes_context* c, const char* filename, codes_bufr_header** result, int* num_messages); /* --------------------------------------- */ - - #ifdef __cplusplus } #endif diff --git a/src/grib_api.h b/src/grib_api.h index ca1d699ec..069ad4cb1 100644 --- a/src/grib_api.h +++ b/src/grib_api.h @@ -1559,6 +1559,60 @@ grib_handle *grib_util_set_spec2(grib_handle *h, int parse_keyval_string(const char *grib_tool, char *arg, int values_required, int default_type, grib_values values[], int *count); grib_handle *grib_new_from_file(grib_context *c, FILE *f, int headers_only, int *error); + +typedef struct codes_bufr_header { + off_t message_offset; + size_t message_size; + long edition; + unsigned long totalLength; + unsigned long section1Length; + long masterTableNumber; + long bufrHeaderSubCentre; + long bufrHeaderCentre; + long updateSequenceNumber; + long section1Flags; + long dataCategory; + long dataSubCategory; + long masterTablesVersionNumber; + long localTablesVersionNumber; + long typicalYearOfCentury; + long typicalMonth; + long typicalDay; + long typicalHour; + long typicalMinute; + + /* Extra BUFR4 keys */ + long internationalDataSubCategory; + long typicalYear; + long typicalSecond; + + long localSectionPresent; + + /* ECMWF local section keys */ + long rdbType; + long oldSubtype; + long localYear; + long localMonth; + long localDay; + long localHour; + long localMinute; + long localSecond; + + long rdbtimeDay; + long rdbtimeHour; + long rdbtimeMinute; + long rdbtimeSecond; + + long rectimeDay; + long rectimeHour; + long rectimeMinute; + long rectimeSecond; + + long qualityControl; + long newSubtype; + long daLoop; +} codes_bufr_header; + /* --------------------------------------- */ typedef void (*codes_assertion_failed_proc)(const char* message); diff --git a/src/grib_io.c b/src/grib_io.c index bb98b42d7..5f05fb2f5 100644 --- a/src/grib_io.c +++ b/src/grib_io.c @@ -1669,12 +1669,12 @@ int grib_count_in_file(grib_context* c, FILE* f,int* n) int grib_count_in_filename(grib_context* c, const char* filename, int* n) { - int err=0; + int err = 0; FILE* fp = NULL; if (!c) c=grib_context_get_default(); - fp = fopen(filename, "r"); + fp = fopen(filename, "rb"); if (!fp) { - grib_context_log(c, GRIB_LOG_ERROR,"grib_count_in_filename: Unable to read file \"%s\"", filename); + grib_context_log(c, GRIB_LOG_ERROR, "grib_count_in_filename: Unable to read file \"%s\"", filename); perror(filename); return GRIB_IO_PROBLEM; } @@ -1682,3 +1682,297 @@ int grib_count_in_filename(grib_context* c, const char* filename, int* n) fclose(fp); return err; } + +#define BUFR_SECTION0_LEN 8 /* BUFR section 0 is always 8 bytes long */ +static int bufr_extract_edition(const void* message, long* edition) +{ + const long nbits_edition = 8; + long pos_edition = 7*8; + + *edition = (long)grib_decode_unsigned_long(message, &pos_edition, nbits_edition); + return GRIB_SUCCESS; +} +/* The ECMWF BUFR local use section */ +static int bufr_decode_rdb_keys(const void* message, long offset_section2, codes_bufr_header* hdr) +{ + long nbits_rdbType = 1*8; + long pos_rdbType = (offset_section2+4)*8; + long nbits_oldSubtype = 1*8; + long pos_oldSubtype = (offset_section2+5)*8; + + long nbits_qualityControl = 1*8; + long pos_qualityControl = (offset_section2+48)*8; + long nbits_newSubtype = 2*8; + long pos_newSubtype = (offset_section2+49)*8; + long nbits_daLoop = 1*8; + long pos_daLoop = (offset_section2+51)*8; + + long start = 0; + const long offset_keyData = offset_section2 + 6; + const long offset_rdbtime = offset_section2 + 38; + const long offset_rectime = offset_section2 + 41; + + unsigned char* p = (unsigned char*)message + offset_keyData; + DebugAssert(hdr->localSectionPresent); + + hdr->rdbType = (long)grib_decode_unsigned_long(message, &pos_rdbType, nbits_rdbType); + hdr->oldSubtype = (long)grib_decode_unsigned_long(message, &pos_oldSubtype, nbits_oldSubtype); + + start = 0; + hdr->localYear = (long)grib_decode_unsigned_long(p, &start, 12); + hdr->localMonth = (long)grib_decode_unsigned_long(p, &start, 4); + hdr->localDay = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->localHour = (long)grib_decode_unsigned_long(p, &start, 5); + hdr->localMinute = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->localSecond = (long)grib_decode_unsigned_long(p, &start, 6); + + /* rdbtime */ + p = (unsigned char*)message + offset_rdbtime; + start = 0; + hdr->rdbtimeDay = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->rdbtimeHour = (long)grib_decode_unsigned_long(p, &start, 5); + hdr->rdbtimeMinute = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->rdbtimeSecond = (long)grib_decode_unsigned_long(p, &start, 6); + + /* rectime */ + p = (unsigned char*)message + offset_rectime; + start = 0; + hdr->rectimeDay = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->rectimeHour = (long)grib_decode_unsigned_long(p, &start, 5); + hdr->rectimeMinute = (long)grib_decode_unsigned_long(p, &start, 6); + hdr->rectimeSecond = (long)grib_decode_unsigned_long(p, &start, 6); + + hdr->qualityControl = (long)grib_decode_unsigned_long(message, &pos_qualityControl, nbits_qualityControl); + hdr->newSubtype = (long)grib_decode_unsigned_long(message, &pos_newSubtype, nbits_newSubtype); + hdr->daLoop = (long)grib_decode_unsigned_long(message, &pos_daLoop, nbits_daLoop); + + return GRIB_SUCCESS; +} + +static int bufr_decode_edition3(const void* message, codes_bufr_header* hdr) +{ + int err = GRIB_SUCCESS; + + const long nbits_totalLength = 3*8; + long pos_totalLength = 4*8; + + const long nbits_section1Length = 3*8; + long pos_section1Length = 8*8; + + long nbits_masterTableNumber = 1*8; + long pos_masterTableNumber = 11*8; + + long nbits_bufrHeaderSubCentre = 1*8; + long pos_bufrHeaderSubCentre = 12*8; + + long nbits_bufrHeaderCentre = 1*8; + long pos_bufrHeaderCentre = 13*8; + + long nbits_updateSequenceNumber = 1*8; + long pos_updateSequenceNumber = 14*8; + + long nbits_section1Flags = 1*8; + long pos_section1Flags = 15*8; + + long nbits_dataCategory = 1*8; + long pos_dataCategory = 16*8; + + long nbits_dataSubCategory = 1*8; + long pos_dataSubCategory = 17*8; + + long nbits_masterTablesVersionNumber = 1*8; + long pos_masterTablesVersionNumber = 18*8; + + long nbits_localTablesVersionNumber = 1*8; + long pos_localTablesVersionNumber = 19*8; + + long nbits_typicalYearOfCentury = 1*8; + long pos_typicalYearOfCentury = 20*8; + + long nbits_typicalMonth = 1*8; + long pos_typicalMonth = 21*8; + + long nbits_typicalDay = 1*8; + long pos_typicalDay = 22*8; + + long nbits_typicalHour = 1*8; + long pos_typicalHour = 23*8; + + long nbits_typicalMinute = 1*8; + long pos_typicalMinute = 24*8; + + int ecmwfLocalSectionPresent = 0; + + hdr->totalLength = grib_decode_unsigned_long(message, &pos_totalLength, nbits_totalLength); + hdr->section1Length = grib_decode_unsigned_long(message, &pos_section1Length, nbits_section1Length); + hdr->masterTableNumber = (long)grib_decode_unsigned_long(message, &pos_masterTableNumber, nbits_masterTableNumber); + hdr->bufrHeaderSubCentre = (long)grib_decode_unsigned_long(message, &pos_bufrHeaderSubCentre, nbits_bufrHeaderSubCentre); + hdr->bufrHeaderCentre = (long)grib_decode_unsigned_long(message, &pos_bufrHeaderCentre, nbits_bufrHeaderCentre); + hdr->updateSequenceNumber = (long)grib_decode_unsigned_long(message, &pos_updateSequenceNumber, nbits_updateSequenceNumber); + hdr->section1Flags = (long)grib_decode_unsigned_long(message, &pos_section1Flags, nbits_section1Flags); + hdr->dataCategory = (long)grib_decode_unsigned_long(message, &pos_dataCategory, nbits_dataCategory); + hdr->dataSubCategory = (long)grib_decode_unsigned_long(message, &pos_dataSubCategory, nbits_dataSubCategory); + hdr->masterTablesVersionNumber = (long)grib_decode_unsigned_long( + message, &pos_masterTablesVersionNumber, nbits_masterTablesVersionNumber); + hdr->localTablesVersionNumber = (long)grib_decode_unsigned_long(message, &pos_localTablesVersionNumber, nbits_localTablesVersionNumber); + hdr->typicalYearOfCentury = (long)grib_decode_unsigned_long(message, &pos_typicalYearOfCentury, nbits_typicalYearOfCentury); + hdr->typicalMonth = (long)grib_decode_unsigned_long(message, &pos_typicalMonth, nbits_typicalMonth); + hdr->typicalDay = (long)grib_decode_unsigned_long(message, &pos_typicalDay, nbits_typicalDay); + hdr->typicalHour = (long)grib_decode_unsigned_long(message, &pos_typicalHour, nbits_typicalHour); + hdr->typicalMinute = (long)grib_decode_unsigned_long(message, &pos_typicalMinute, nbits_typicalMinute); + + hdr->localSectionPresent = (hdr->section1Flags != 0); + ecmwfLocalSectionPresent = (hdr->bufrHeaderCentre == 98 && hdr->localSectionPresent); + if (ecmwfLocalSectionPresent) { + const long offset_section2 = hdr->section1Length + BUFR_SECTION0_LEN; /*bytes*/ + err = bufr_decode_rdb_keys(message, offset_section2, hdr); + } + + return err; +} + +static int bufr_decode_edition4(const void* message, codes_bufr_header* hdr) +{ + int err = GRIB_SUCCESS; + + const long nbits_totalLength = 3*8; + long pos_totalLength = 4*8; + + const long nbits_section1Length = 3*8; + long pos_section1Length = 8*8; + + long nbits_masterTableNumber = 1*8; + long pos_masterTableNumber = 11*8; + + long nbits_bufrHeaderCentre = 2*8; + long pos_bufrHeaderCentre = 12*8; + + long nbits_bufrHeaderSubCentre = 2*8; + long pos_bufrHeaderSubCentre = 14*8; + + long nbits_updateSequenceNumber = 1*8; + long pos_updateSequenceNumber = 16*8; + + long nbits_section1Flags = 1*8; + long pos_section1Flags = 17*8; + + long nbits_dataCategory = 1*8; + long pos_dataCategory = 18*8; + + long nbits_internationalDataSubCategory = 1*8; + long pos_internationalDataSubCategory = 19*8; + + long nbits_dataSubCategory = 1*8; + long pos_dataSubCategory = 20*8; + + long nbits_masterTablesVersionNumber = 1*8; + long pos_masterTablesVersionNumber = 21*8; + + long nbits_localTablesVersionNumber = 1*8; + long pos_localTablesVersionNumber = 22*8; + + long nbits_typicalYear = 2*8; + long pos_typicalYear = 23*8; + + long nbits_typicalMonth = 1*8; + long pos_typicalMonth = 25*8; + + long nbits_typicalDay = 1*8; + long pos_typicalDay = 26*8; + + long nbits_typicalHour = 1*8; + long pos_typicalHour = 27*8; + + long nbits_typicalMinute = 1*8; + long pos_typicalMinute = 28*8; + + long nbits_typicalSecond = 1*8; + long pos_typicalSecond = 29*8; + + int ecmwfLocalSectionPresent = 0; + + hdr->totalLength = grib_decode_unsigned_long(message, &pos_totalLength, nbits_totalLength); + hdr->section1Length = grib_decode_unsigned_long(message, &pos_section1Length, nbits_section1Length); + hdr->masterTableNumber = (long)grib_decode_unsigned_long(message, &pos_masterTableNumber, nbits_masterTableNumber); + hdr->bufrHeaderCentre = (long)grib_decode_unsigned_long(message, &pos_bufrHeaderCentre, nbits_bufrHeaderCentre); + hdr->bufrHeaderSubCentre = (long)grib_decode_unsigned_long(message, &pos_bufrHeaderSubCentre, nbits_bufrHeaderSubCentre); + hdr->updateSequenceNumber = (long)grib_decode_unsigned_long(message, &pos_updateSequenceNumber, nbits_updateSequenceNumber); + hdr->section1Flags = (long)grib_decode_unsigned_long(message, &pos_section1Flags, nbits_section1Flags); + hdr->dataCategory = (long)grib_decode_unsigned_long(message, &pos_dataCategory, nbits_dataCategory); + hdr->internationalDataSubCategory = (long)grib_decode_unsigned_long(message, &pos_internationalDataSubCategory, nbits_internationalDataSubCategory); + hdr->dataSubCategory = (long)grib_decode_unsigned_long(message, &pos_dataSubCategory, nbits_dataSubCategory); + hdr->masterTablesVersionNumber = (long)grib_decode_unsigned_long(message, &pos_masterTablesVersionNumber, nbits_masterTablesVersionNumber); + hdr->localTablesVersionNumber = (long)grib_decode_unsigned_long(message, &pos_localTablesVersionNumber, nbits_localTablesVersionNumber); + hdr->typicalYear = (long)grib_decode_unsigned_long(message, &pos_typicalYear, nbits_typicalYear); + hdr->typicalMonth = (long)grib_decode_unsigned_long(message, &pos_typicalMonth, nbits_typicalMonth); + hdr->typicalDay = (long)grib_decode_unsigned_long(message, &pos_typicalDay, nbits_typicalDay); + hdr->typicalHour = (long)grib_decode_unsigned_long(message, &pos_typicalHour, nbits_typicalHour); + hdr->typicalMinute = (long)grib_decode_unsigned_long(message, &pos_typicalMinute, nbits_typicalMinute); + hdr->typicalSecond = (long)grib_decode_unsigned_long(message, &pos_typicalSecond, nbits_typicalSecond); + + hdr->localSectionPresent = (hdr->section1Flags != 0); + ecmwfLocalSectionPresent = (hdr->bufrHeaderCentre == 98 && hdr->localSectionPresent); + if (ecmwfLocalSectionPresent) { + const long offset_section2 = hdr->section1Length + BUFR_SECTION0_LEN; /*bytes*/ + err = bufr_decode_rdb_keys(message, offset_section2, hdr); + } + + return err; +} + +static int bufr_decode_header(const void* message, off_t offset, size_t size, codes_bufr_header* hdr) +{ + int err = GRIB_SUCCESS; + + hdr->message_offset = offset; + hdr->message_size = size; + + err = bufr_extract_edition(message, &hdr->edition); + + if (hdr->edition == 3) { + err = bufr_decode_edition3(message, hdr); + } else if (hdr->edition == 4) { + err = bufr_decode_edition4(message, hdr); + } else { + grib_context_log(NULL, GRIB_LOG_ERROR, "Unsupported BUFR edition: %ld", hdr->edition); + err = GRIB_DECODING_ERROR; + } + + return err; +} + +int codes_bufr_extract_headers_malloc(grib_context* c, const char* filename, codes_bufr_header** result, int* num_messages) +{ + int err = 0, i = 0; + FILE* fp = NULL; + void* mesg = NULL; + size_t size = 0; + off_t offset = 0; + codes_bufr_header* headers = *result; + + if (!c) c=grib_context_get_default(); + fp = fopen(filename, "rb"); + if (!fp) { + grib_context_log(c, GRIB_LOG_ERROR, "codes_bufr_extract_headers_malloc: Unable to read file \"%s\"", filename); + perror(filename); + return GRIB_IO_PROBLEM; + } + err = grib_count_in_file(c, fp, num_messages); + if (err) return err; + + headers = (codes_bufr_header*)calloc((size_t)num_messages, sizeof(codes_bufr_header)); + if (!headers) { + return GRIB_OUT_OF_MEMORY; + } + i = 0; + while (err != GRIB_END_OF_FILE) { + mesg = wmo_read_bufr_from_file_malloc(fp, 0, &size, &offset, &err); + if (mesg != NULL && err == 0) { + int err2 = bufr_decode_header(mesg, offset, size, &headers[i++]); + if (err2) return err2; + grib_context_free(c, mesg); + } + } + + return GRIB_SUCCESS; +}