From b3b9baf006f288e043208609c947b902c482a1cd Mon Sep 17 00:00:00 2001 From: Shahram Najm Date: Fri, 27 Jul 2018 17:40:13 +0100 Subject: [PATCH] ECC-604: BUFR decoding in parallel --- tests/CMakeLists.txt | 7 +- tests/bufr_ecc-604.c | 174 ++++++++++++++++++++++++++++++++++++++++++ tests/bufr_ecc-604.sh | 66 ++++++++++++++++ 3 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 tests/bufr_ecc-604.c create mode 100755 tests/bufr_ecc-604.sh diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bd062ed40..ac4e5bfa1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -218,7 +218,7 @@ if( ENABLE_EXTRA_TESTS AND HAVE_ECCODES_THREADS ) COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/grib_encode_pthreads.sh ) - foreach( test grib_ecc-604 ) + foreach( test grib_ecc-604 bufr_ecc-604 ) ecbuild_add_executable( TARGET ${test} NOINSTALL SOURCES ${test}.c @@ -230,6 +230,11 @@ if( ENABLE_EXTRA_TESTS AND HAVE_ECCODES_THREADS ) TEST_DEPENDS eccodes_download_gribs COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/grib_ecc-604.sh ) + ecbuild_add_test( TARGET eccodes_t_bufr_ecc-604 + TYPE SCRIPT + TEST_DEPENDS eccodes_download_bufrs + COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/bufr_ecc-604.sh + ) endif() ecbuild_add_test( TARGET eccodes_t_grib_to_netcdf diff --git a/tests/bufr_ecc-604.c b/tests/bufr_ecc-604.c new file mode 100644 index 000000000..a4ee6e573 --- /dev/null +++ b/tests/bufr_ecc-604.c @@ -0,0 +1,174 @@ +/* + * Test for ECC-604: Each thread creates a new BUFR handle, optionaly clone it and/or write it out + */ +#include +#include +#include +#include + +#include "eccodes.h" +#include "grib_api_internal.h" + +/* These are passed in via argv */ +static size_t NUM_THREADS = 0; +static size_t FILES_PER_ITERATION = 0; +static char* INPUT_FILE = NULL; +int opt_clone = 0; /* If 1 then clone source handle */ +int opt_write = 0; /* If 1 write handle to file */ + +static int encode_file(char *template_file, char *output_file) +{ + FILE *in, *out; + codes_handle *source_handle = NULL; + const void *buffer = NULL; + size_t size = 0; + int err = 0; + long numSubsets = 0; + + in = fopen(template_file,"r"); assert(in); + if (output_file) { + out = fopen(output_file,"w"); assert(out); + } + + /* loop over the messages in the source BUFR and clone them */ + while ((source_handle = codes_handle_new_from_file(NULL, in, PRODUCT_BUFR, &err)) != NULL || err != CODES_SUCCESS) { + codes_handle *h = source_handle; + + if (opt_clone) { + h = codes_handle_clone(source_handle); assert(h); + } + + CODES_CHECK(codes_get_long(h,"numberOfSubsets", &numSubsets),0); + CODES_CHECK(codes_set_long(h, "unpack", 1),0); + + CODES_CHECK(codes_get_message(h,&buffer,&size),0); + if (output_file) { + if(fwrite(buffer,1,size,out) != size) { + perror(output_file); + return 1; + } + } + { + /* JSON dump with all attributes */ + FILE *devnull = fopen("/dev/null", "w"); + grib_dumper* dumper = NULL; + const char* dumper_name = "bufr_simple"; + unsigned long dump_flags = 512; + //codes_dump_content(source_handle,devnull, "json", 1024, NULL); + dumper=grib_dump_content_with_dumper(source_handle, dumper, devnull, dumper_name, dump_flags,0); + assert(dumper); + } + + codes_handle_delete(source_handle); + if(opt_clone) codes_handle_delete(h); + } + + if (output_file) fclose(out); + fclose(in); + + return 0; +} + +void do_stuff(void *data); + +/* Structure for passing data to threads */ +struct v { + size_t number; + char *data; +}; + +void *runner(void *ptr); /* the thread */ + +int main(int argc, char **argv) +{ + size_t i; + int thread_counter = 0; + int parallel=1, index=0, c=0; + const char* prog = argv[0]; + char* mode; + if (argc<5 || argc>7) { + fprintf(stderr, "Usage:\n\t%s [options] seq file numRuns numIter\nOr\n\t%s [options] par file numThreads numIter\n", prog, prog); + return 1; + } + + while ((c = getopt (argc, argv, "cw")) != -1) { + switch (c) { + case 'c': opt_clone=1; break; + case 'w': opt_write=1; break; + } + } + index = optind; + mode = argv[index]; + INPUT_FILE = argv[index+1]; + NUM_THREADS = atol(argv[index+2]); + FILES_PER_ITERATION = atol(argv[index+3]); + + if (strcmp(mode,"seq")==0) { + parallel = 0; + } + if (parallel) { + printf("Running parallel in %ld threads. %ld iterations\n", NUM_THREADS, FILES_PER_ITERATION); + printf("Options: clone=%d, write=%d\n", opt_clone, opt_write); + } else { + printf("Running sequentially in %ld runs. %ld iterations\n", NUM_THREADS, FILES_PER_ITERATION); + } + + { + pthread_t workers[NUM_THREADS]; + for (i = 0; i < NUM_THREADS; i++) { + struct v *data = (struct v *) malloc(sizeof(struct v)); + data->number = i; + data->data = NULL; + + if (parallel) { + /* Now we will create the thread passing it data as an argument */ + pthread_create(&workers[thread_counter], NULL, runner, data); + /*pthread_join(workers[thread_counter], NULL);*/ + thread_counter++; + } else { + do_stuff(data); + } + } + + if (parallel) { + for (i = 0; i < NUM_THREADS; i++) { + pthread_join(workers[i], NULL); + } + } + } + + return 0; +} + +void *runner(void *ptr) +{ + do_stuff(ptr); + pthread_exit(0); +} + +void do_stuff(void *ptr) +{ + /* Cast argument to struct v pointer */ + struct v *data = ptr; + size_t i; + char output_file[50]; + time_t ltime; + struct tm result; + char stime[32]; + + for (i=0; inumber, i); + encode_file(INPUT_FILE,output_file); + } else { + encode_file(INPUT_FILE,NULL); + } + } + + ltime = time(NULL); + localtime_r(<ime, &result); + strftime(stime, 32, "%H:%M:%S", &result); /* Try to get milliseconds here too*/ + /* asctime_r(&result, stime); */ + + printf("%s: Worker %ld finished.\n", stime, data->number); +} diff --git a/tests/bufr_ecc-604.sh b/tests/bufr_ecc-604.sh new file mode 100755 index 000000000..482b4413d --- /dev/null +++ b/tests/bufr_ecc-604.sh @@ -0,0 +1,66 @@ +#!/bin/sh +# Copyright 2005-2018 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities granted to it by +# virtue of its status as an intergovernmental organisation nor does it submit to any jurisdiction. +# + +. ./include.sh + +label="bufr_ecc-604" +temp_dir=tempdir.${label} + +NUM_THREADS=3 +NUM_ITER=10 +OUTPUT=output + +validate() +{ + echo "Checking every output file is identical..." + # Get checksum of first file + ck1=`cksum $OUTPUT/output_file_0-0.bufr | awk '{print $1}'` + set +x + # Get checksum of all of them and sort unique + res=`cksum $OUTPUT/output_file_* | awk '{print $1}' | sort -u` + set -x + # Should be the same as the first + [ "$res" = "$ck1" ] +} +process() +{ + input=$1 # The input BUFR file + + # Test 01: Clone + output + # ------------------------ + rm -fr $OUTPUT; mkdir -p $OUTPUT + time ${test_dir}/bufr_ecc-604 -c -w par $input $NUM_THREADS $NUM_ITER + validate + + # Test 02: No clone + output + # -------------------------- + rm -fr $OUTPUT; mkdir -p $OUTPUT + time ${test_dir}/bufr_ecc-604 -w par $input $NUM_THREADS $NUM_ITER + validate + + # Test 03: Clone + no output + # --------------------------- + rm -fr $OUTPUT + time ${test_dir}/bufr_ecc-604 -c par $input $NUM_THREADS $NUM_ITER + # Nothing to validate as there is no output +} +################################################### +rm -fr $temp_dir +mkdir -p $temp_dir +cd $temp_dir + +for bf in $data_dir/bufr/*.bufr; do + echo "Doing $bf" + process $bf +done + +# Clean up +cd $test_dir +rm -fr $temp_dir