Fast codes_count: More tests are passing (WIP)

This commit is contained in:
Shahram Najm 2023-06-16 17:10:13 +01:00 committed by shahramn
parent d78166f28b
commit 4d1f9c49b6
3 changed files with 105 additions and 57 deletions

View File

@ -1377,10 +1377,10 @@ int wmo_read_bufr_from_file(FILE* f, void* buffer, size_t* len);
int wmo_read_gts_from_file(FILE* f, void* buffer, size_t* len); int wmo_read_gts_from_file(FILE* f, void* buffer, size_t* len);
int wmo_read_any_from_stream(void* stream_data, long (*stream_proc)(void*, void* buffer, long len), void* buffer, size_t* len); int wmo_read_any_from_stream(void* stream_data, long (*stream_proc)(void*, void* buffer, long len), void* buffer, size_t* len);
int wmo_read_any_from_file_noalloc(FILE* f, void* buffer, size_t* len); int wmo_read_any_from_file_fast(FILE* f, void* buffer, size_t* len);
int wmo_read_grib_from_file_noalloc(FILE* f, void* buffer, size_t* len); int wmo_read_grib_from_file_fast(FILE* f, void* buffer, size_t* len);
int wmo_read_bufr_from_file_noalloc(FILE* f, void* buffer, size_t* len); int wmo_read_bufr_from_file_fast(FILE* f, void* buffer, size_t* len);
int wmo_read_gts_from_file_noalloc(FILE* f, void* buffer, size_t* len); int wmo_read_gts_from_file_fast(FILE* f, void* buffer, size_t* len);
/* These functions allocate memory for the result so the user is responsible for freeing it */ /* These functions allocate memory for the result so the user is responsible for freeing it */
void* wmo_read_any_from_stream_malloc(void* stream_data, long (*stream_proc)(void*, void* buffer, long len), size_t* size, int* err); void* wmo_read_any_from_stream_malloc(void* stream_data, long (*stream_proc)(void*, void* buffer, long len), size_t* size, int* err);

View File

@ -84,6 +84,7 @@ typedef struct reader
} reader; } reader;
// If no_alloc argument is 1, then the content of the message are not stored. We just seek to the final 7777
static int read_the_rest(reader* r, size_t message_length, unsigned char* tmp, int already_read, int check7777, int no_alloc) static int read_the_rest(reader* r, size_t message_length, unsigned char* tmp, int already_read, int check7777, int no_alloc)
{ {
int err = GRIB_SUCCESS; int err = GRIB_SUCCESS;
@ -95,61 +96,48 @@ static int read_the_rest(reader* r, size_t message_length, unsigned char* tmp, i
if (message_length == 0) if (message_length == 0)
return GRIB_BUFFER_TOO_SMALL; return GRIB_BUFFER_TOO_SMALL;
buffer_size = message_length; buffer_size = message_length; // store the whole message in the buffer
if (no_alloc) if (no_alloc)
buffer_size = 5; buffer_size = 5; // big enough to store the 7777
rest = message_length - already_read; rest = message_length - already_read;
r->message_size = message_length; r->message_size = message_length;
buffer = (unsigned char*)r->alloc(r->alloc_data, &buffer_size, &err); buffer = (unsigned char*)r->alloc(r->alloc_data, &buffer_size, &err);
if (err) if (err)
return err; return err;
if (!no_alloc) { if (no_alloc) {
r->seek(r->read_data, rest - 4); // jump to the end before the 7777
} else {
if (buffer == NULL || (buffer_size < message_length)) { if (buffer == NULL || (buffer_size < message_length)) {
return GRIB_BUFFER_TOO_SMALL; return GRIB_BUFFER_TOO_SMALL;
} }
memcpy(buffer, tmp, already_read); memcpy(buffer, tmp, already_read);
} else {
r->seek(r->read_data, rest-4);
} }
bool read_failed = false;
if (no_alloc) { if (no_alloc) {
if ((r->read(r->read_data, buffer, 4, &err) != 4) || err) { read_failed = ((r->read(r->read_data, buffer, 4, &err) != 4) || err);
/*fprintf(stderr, "read_the_rest: r->read failed: %s\n", grib_get_error_message(err));*/
if (c->debug)
fprintf(stderr, "ECCODES DEBUG read_the_rest: Read failed (Coded length=%zu, Already read=%d)\n",
message_length, already_read);
return err;
}
if (check7777 && !r->headers_only &&
(buffer[0] != '7' ||
buffer[1] != '7' ||
buffer[2] != '7' ||
buffer[3] != '7')) {
if (c->debug)
fprintf(stderr, "ECCODES DEBUG read_the_rest: No final 7777 at expected location (Coded length=%zu)\n", message_length);
return GRIB_WRONG_LENGTH;
}
} }
else { else {
if ((r->read(r->read_data, buffer + already_read, rest, &err) != rest) || err) { read_failed = ((r->read(r->read_data, buffer + already_read, rest, &err) != rest) || err);
/*fprintf(stderr, "read_the_rest: r->read failed: %s\n", grib_get_error_message(err));*/ }
if (c->debug) if (read_failed) {
fprintf(stderr, "ECCODES DEBUG read_the_rest: Read failed (Coded length=%zu, Already read=%d)\n", if (c->debug)
message_length, already_read); fprintf(stderr, "ECCODES DEBUG %s: Read failed (Coded length=%zu, Already read=%d)",
return err; __func__, message_length, already_read);
} return err;
}
if (check7777 && !r->headers_only && const size_t mlen = no_alloc ? 4 : message_length;
(buffer[message_length - 4] != '7' || if (check7777 && !r->headers_only &&
buffer[message_length - 3] != '7' || (buffer[mlen - 4] != '7' ||
buffer[message_length - 2] != '7' || buffer[mlen - 3] != '7' ||
buffer[message_length - 1] != '7')) buffer[mlen - 2] != '7' ||
{ buffer[mlen - 1] != '7'))
if (c->debug) {
fprintf(stderr, "ECCODES DEBUG read_the_rest: No final 7777 at expected location (Coded length=%zu)\n", message_length); if (c->debug)
return GRIB_WRONG_LENGTH; fprintf(stderr, "ECCODES DEBUG %s: No final 7777 at expected location (Coded length=%zu)\n", __func__, message_length);
} return GRIB_WRONG_LENGTH;
} }
return GRIB_SUCCESS; return GRIB_SUCCESS;
@ -1215,16 +1203,18 @@ int wmo_read_bufr_from_file(FILE* f, void* buffer, size_t* len)
return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/0, 0, 1, 0, 0); return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/0, 0, 1, 0, 0);
} }
int wmo_read_any_from_file_noalloc(FILE* f, void* buffer, size_t* len) { int wmo_read_any_from_file_fast(FILE* f, void* buffer, size_t* len) {
return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 1, 1, 1, 1); return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 1, 1, 1, 1);
} }
int wmo_read_grib_from_file_noalloc(FILE* f, void* buffer, size_t* len) { int wmo_read_grib_from_file_fast(FILE* f, void* buffer, size_t* len) {
return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 1, 0, 0, 0); return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 1, 0, 0, 0);
} }
int wmo_read_bufr_from_file_noalloc(FILE* f, void* buffer, size_t* len) { int wmo_read_bufr_from_file_fast(FILE* f, void* buffer, size_t* len) {
return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 0, 1, 0, 0); return ecc_wmo_read_any_from_file(f, buffer, len, /*no_alloc=*/1, 0, 1, 0, 0);
} }
int wmo_read_gts_from_file_noalloc(FILE* f, void* buffer, size_t* len) { return GRIB_NOT_IMPLEMENTED; } int wmo_read_gts_from_file_fast(FILE* f, void* buffer, size_t* len) {
return GRIB_NOT_IMPLEMENTED;
}
int wmo_read_gts_from_file(FILE* f, void* buffer, size_t* len) int wmo_read_gts_from_file(FILE* f, void* buffer, size_t* len)
{ {

View File

@ -18,28 +18,79 @@ static void usage(const char* prog)
printf("Usage: %s [-v] [-f] infile1 infile2 ... \n", prog); printf("Usage: %s [-v] [-f] infile1 infile2 ... \n", prog);
exit(1); exit(1);
} }
static int count_messages_slow(FILE* in, int message_type, unsigned long* count)
{
void* mesg = NULL;
size_t size = 0;
off_t offset = 0;
int err = GRIB_SUCCESS;
typedef void* (*wmo_read_proc)(FILE*, int, size_t*, off_t*, int*);
wmo_read_proc wmo_read = NULL;
grib_context* c = grib_context_get_default();
static int count_messages(FILE* in, int message_type, unsigned long* count) if (!in)
return 1;
if (message_type == CODES_GRIB)
wmo_read = wmo_read_grib_from_file_malloc;
else if (message_type == CODES_BUFR)
wmo_read = wmo_read_bufr_from_file_malloc;
else if (message_type == CODES_GTS)
wmo_read = wmo_read_gts_from_file_malloc;
else
wmo_read = wmo_read_any_from_file_malloc;
if (fail_on_error) {
while ((mesg = wmo_read(in, 0, &size, &offset, &err)) != NULL && err == GRIB_SUCCESS) {
grib_context_free(c, mesg);
(*count)++;
}
}
else {
int done = 0;
while (!done) {
mesg = wmo_read(in, 0, &size, &offset, &err);
/*printf("Count so far=%ld, mesg=%x, err=%d (%s)\n", *count, mesg, err, grib_get_error_message(err));*/
if (!mesg) {
if (err == GRIB_END_OF_FILE || err == GRIB_PREMATURE_END_OF_FILE) {
done = 1; /* reached the end */
}
}
if (mesg && !err) {
(*count)++;
}
grib_context_free(c, mesg);
}
}
if (err == GRIB_END_OF_FILE)
err = GRIB_SUCCESS;
if (mesg) grib_context_free(c, mesg);
return err;
}
static int count_messages_fast(FILE* in, int message_type, unsigned long* count)
{ {
size_t size = 0; size_t size = 0;
int err = GRIB_SUCCESS; int err = GRIB_SUCCESS;
typedef int (*wmo_read_proc)(FILE* , void* , size_t*); typedef int (*wmo_read_proc)(FILE* , void* , size_t*);
wmo_read_proc wmo_read = NULL; wmo_read_proc wmo_read = NULL;
//grib_context* c = grib_context_get_default();
unsigned char buff1[1000]; unsigned char buff1[1000];
size = sizeof(buff1); size = sizeof(buff1);
if (!in) if (!in)
return 1; return 1;
/* printf("message_type=%d\n", message_type); */
if (message_type == CODES_GRIB) if (message_type == CODES_GRIB)
wmo_read = wmo_read_grib_from_file_noalloc; wmo_read = wmo_read_grib_from_file_fast;
else if (message_type == CODES_BUFR) else if (message_type == CODES_BUFR)
wmo_read = wmo_read_bufr_from_file_noalloc; wmo_read = wmo_read_bufr_from_file_fast;
else if (message_type == CODES_GTS) else if (message_type == CODES_GTS)
wmo_read = NULL; wmo_read = wmo_read_gts_from_file_fast;
else else
wmo_read = wmo_read_any_from_file_noalloc; wmo_read = wmo_read_any_from_file_fast;
if (fail_on_error) { if (fail_on_error) {
while ((err = wmo_read(in, buff1, &size)) == GRIB_SUCCESS) { while ((err = wmo_read(in, buff1, &size)) == GRIB_SUCCESS) {
@ -49,7 +100,7 @@ static int count_messages(FILE* in, int message_type, unsigned long* count)
else { else {
int done = 0; int done = 0;
while (!done) { while (!done) {
err = wmo_read(in, 0, &size); err = wmo_read(in, buff1, &size);
/*printf("Count so far=%ld, mesg=%x, err=%d (%s)\n", *count, mesg, err, grib_get_error_message(err));*/ /*printf("Count so far=%ld, mesg=%x, err=%d (%s)\n", *count, mesg, err, grib_get_error_message(err));*/
if (err) { if (err) {
if (err == GRIB_END_OF_FILE || err == GRIB_PREMATURE_END_OF_FILE) { if (err == GRIB_END_OF_FILE || err == GRIB_PREMATURE_END_OF_FILE) {
@ -76,6 +127,8 @@ int main(int argc, char* argv[])
int err = 0, files_processed = 0; int err = 0, files_processed = 0;
unsigned long count_total = 0, count_curr = 0; unsigned long count_total = 0, count_curr = 0;
int message_type = 0; /* GRIB, BUFR etc */ int message_type = 0; /* GRIB, BUFR etc */
typedef int (*count_proc)(FILE*, int, unsigned long*);
count_proc do_count = count_messages_fast;
toolname = argv[0]; toolname = argv[0];
if (argc < 2) if (argc < 2)
@ -104,17 +157,22 @@ int main(int argc, char* argv[])
continue; continue;
} }
if (strcmp(filename, "-") == 0) if (strcmp(filename, "-") == 0) {
infh = stdin; infh = stdin;
else do_count = count_messages_slow; // cannot do fseek on stdin
} else {
infh = fopen(filename, "rb"); infh = fopen(filename, "rb");
}
if (!infh) { if (!infh) {
perror(filename); perror(filename);
exit(1); exit(1);
} }
if (message_type == CODES_GTS) {
do_count = count_messages_slow; // not yet implemented
}
files_processed = 1; /* At least one file processed */ files_processed = 1; /* At least one file processed */
count_curr = 0; count_curr = 0;
err = count_messages(infh, message_type, &count_curr); err = do_count(infh, message_type, &count_curr);
if (err && fail_on_error) { if (err && fail_on_error) {
fprintf(stderr, "Invalid message(s) found in %s", filename); fprintf(stderr, "Invalid message(s) found in %s", filename);
if (count_curr > 0) if (count_curr > 0)