diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 1593b46..c7c4b90 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -8,8 +8,15 @@ + - + + + + + + + - - + diff --git a/Cargo.lock b/Cargo.lock index c13bfbd..d857288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3696,6 +3696,7 @@ dependencies = [ "nexrad-data", "nexrad-decode", "rayon", + "rustc-hash", "serde", "thiserror 2.0.12", "toml", diff --git a/crates/ar2/Cargo.toml b/crates/ar2/Cargo.toml index 8001913..bec173b 100644 --- a/crates/ar2/Cargo.toml +++ b/crates/ar2/Cargo.toml @@ -14,6 +14,7 @@ thiserror = "2" bincode = "2" tracing = "0.1" tracing-subscriber = "0.3" +rustc-hash = "2" [dev-dependencies] criterion = { version = "0.5" } diff --git a/crates/ar2/benches/parse_benchmark.rs b/crates/ar2/benches/parse_benchmark.rs index eabd90b..b46e2e7 100644 --- a/crates/ar2/benches/parse_benchmark.rs +++ b/crates/ar2/benches/parse_benchmark.rs @@ -1,5 +1,6 @@ +use std::io::Cursor; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use wxbox_ar2::parse; +use wxbox_ar2::parse::Ar2v; const KCRP20170825_235733_V06: (&[u8], &str) = (include_bytes!("KCRP20170825_235733_V06"), "KCRP20170825_235733_V06"); const KGWX20250518_165333_V06: (&[u8], &str) = (include_bytes!("KGWX20250518_165333_V06"), "KGWX20250518_165333_V06"); @@ -14,7 +15,8 @@ fn criterion_benchmark(c: &mut Criterion) { test_file_bytes, |b, bytes| { b.iter(|| { - parse(bytes.to_vec()).unwrap(); + let mut r= Cursor::new(bytes.to_vec()); + Ar2v::read(&mut r).unwrap(); }); } ); diff --git a/crates/ar2/src/main.rs b/crates/ar2/src/main.rs index 7cd978f..8022994 100644 --- a/crates/ar2/src/main.rs +++ b/crates/ar2/src/main.rs @@ -1,9 +1,7 @@ -use std::{env, fs}; +use std::{env}; use std::fs::File; -use std::time::Instant; use tracing::Level; use tracing_subscriber::fmt::format::FmtSpan; -use wxbox_ar2::parse; use wxbox_ar2::parse::Ar2v; fn main() { diff --git a/crates/ar2/src/parse/error.rs b/crates/ar2/src/parse/error.rs index 4df9949..797c7a8 100644 --- a/crates/ar2/src/parse/error.rs +++ b/crates/ar2/src/parse/error.rs @@ -8,5 +8,7 @@ pub enum Ar2vError { #[error("i/o error: {0}")] IoError(#[from] std::io::Error), #[error("unknown compression record")] - UnknownCompressionRecord + UnknownCompressionRecord, + #[error("m31 missing required field")] + Message31MissingRequiredField } \ No newline at end of file diff --git a/crates/ar2/src/parse/mod.rs b/crates/ar2/src/parse/mod.rs index 51bc0ac..aac1e41 100644 --- a/crates/ar2/src/parse/mod.rs +++ b/crates/ar2/src/parse/mod.rs @@ -1,16 +1,19 @@ +use std::collections::HashMap; use std::fmt::Debug; use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom}; -use std::time::Instant; use bincode::error::DecodeError; use bzip2::read::BzDecoder; -use tracing::{debug, instrument, Level, span, trace}; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use tracing::{debug, Level, span}; use crate::parse::error::Ar2vError; -use crate::parse::types::{DataBlock, ElevationData, GenericDataMoment, Message31Header, MessageHeader, RadialData, VolumeData, VolumeHeaderRecord}; -use crate::parse::util::{ReadExt, unpack_structure}; +use crate::parse::scan::{Elevation, Gate, MomentaryProduct, Radial, Scan}; +use crate::parse::types::{DataBlock, GenericDataMoment, Message, Message31, Message31Header, MessageHeader, VolumeHeaderRecord}; +use crate::parse::util::unpack_structure; -mod types; -mod error; -mod util; +pub mod types; +pub mod error; +pub mod util; +pub mod scan; const DEFAULT_MESSAGE_SIZE: usize = 2432; const MESSAGE_BODY_SIZE: usize = DEFAULT_MESSAGE_SIZE - 12 - 16; @@ -32,12 +35,9 @@ impl Ar2v { let ldm_extract_span = span!(Level::DEBUG, "ldm"); let _ldm_extract_span = ldm_extract_span.enter(); - let mut message_count = 0; + let mut records = vec![]; loop { - let span = span!(Level::DEBUG, "ldm_record"); - let _span = span.enter(); - let size: i32 = match unpack_structure(r) { Ok(s) => s, Err(e) => match e { @@ -51,120 +51,219 @@ impl Ar2v { } }; - let size: usize = size.abs() as usize; + let size: usize = size.unsigned_abs() as usize; - trace!(size, "new ldm record"); - trace!("decompressing {size} bytes"); + // read record then kick off a task to process it + let record_content = read_record(r, size)?; + records.push(record_content); + } - let mut r = { - let dc_span = span!(Level::TRACE, "bz2"); - let _dc_span = dc_span.enter(); + let rs: Vec, Ar2vError>> = records.par_iter() + .map(|record_content| { + let mut r = decompress_record(record_content)?; - let mut compressed_data = vec![0u8; size]; - r.read_exact(&mut compressed_data)?; - let mut bz2_reader = BzDecoder::new(compressed_data.as_slice()); + let mut msgs = vec![]; - let mut buf = vec![]; - bz2_reader.read_to_end(&mut buf)?; + loop { + let start = r.position(); - Cursor::new(buf) - }; - loop { - message_count += 1; - let start = r.position(); - - // legacy compliance - r.seek(SeekFrom::Current(12))?; - let hdr: MessageHeader = match unpack_structure(&mut r) { - Ok(s) => s, - Err(e) => match e { - DecodeError::UnexpectedEnd { .. } => { - break; - }, - DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => { - break; + // legacy compliance + r.seek(SeekFrom::Current(12))?; + let hdr: MessageHeader = match unpack_structure(&mut r) { + Ok(s) => s, + Err(e) => match e { + DecodeError::UnexpectedEnd { .. } => { + break; + }, + DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => { + break; + } + _ => return Err(e.into()) } - _ => return Err(e.into()) - } - }; + }; - let mut msg_size = hdr.message_size as usize; - if msg_size == 65535 { - msg_size = (hdr.num_message_segments as usize) << 16 | hdr.message_segment_num as usize; - } - - if hdr.message_type == 0 { - r.seek(SeekFrom::Current(MESSAGE_BODY_SIZE as i64))?; - continue; - } - - if msg_size < 9 { - panic!("message is too short"); - } - if hdr.num_message_segments == 0 { - panic!("0 segments?"); - } - if hdr.num_message_segments > 10 { - panic!("very large number of segments..."); - } - if ![1_u8,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,31,32,33].contains(&hdr.message_type) { - panic!("invalid message type"); - } - - if hdr.message_type == 31 { - let m31_start = r.position(); - let m31h: Message31Header = unpack_structure(&mut r)?; - - let mut db_pointers = vec![0u32; m31h.data_block_count as usize]; - for pointer in &mut db_pointers { - *pointer = unpack_structure(&mut r)?; + let mut msg_size = hdr.message_size as usize; + if msg_size == 65535 { + msg_size = ((hdr.num_message_segments as usize) << 16) | hdr.message_segment_num as usize; } - for pointer in db_pointers { - r.seek(SeekFrom::Start(m31_start + pointer as u64))?; - let d: DataBlock = unpack_structure(&mut r)?; - r.seek(SeekFrom::Current(-4))?; + if hdr.message_type == 0 { + r.seek(SeekFrom::Current(MESSAGE_BODY_SIZE as i64))?; + continue; + } - match d.data_name.as_str().as_str() { - "VOL" => { - let v: VolumeData = unpack_structure(&mut r)?; - }, - "ELV" => { - let e: ElevationData = unpack_structure(&mut r)?; - }, - "RAD" => { - let r: RadialData = unpack_structure(&mut r)?; - }, + if msg_size < 9 { + panic!("message is too short"); + } + if hdr.num_message_segments == 0 { + panic!("0 segments?"); + } + if hdr.num_message_segments > 100 { + eprintln!("{:?}", hdr); + panic!("very large number of segments {}...", hdr.num_message_segments); + } + if ![1_u8,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,31,32,33].contains(&hdr.message_type) { + panic!("invalid message type"); + } - "REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => { - let moment: GenericDataMoment = unpack_structure(&mut r)?; - let ldm = moment.number_data_moment_gates as usize * moment.data_word_size as usize / 8; + let message = if hdr.message_type == 31 { + let m31_start = r.position(); + let m31h: Message31Header = unpack_structure(&mut r)?; - let mut data = vec![0u8; ldm]; - r.read_exact(&mut data)?; - }, - - _ => panic!("unknown data block type") + let mut db_pointers = vec![0u32; m31h.data_block_count as usize]; + for pointer in &mut db_pointers { + *pointer = unpack_structure(&mut r)?; } - } - } else { - if msg_size < DEFAULT_MESSAGE_SIZE { - r.seek(SeekFrom::Start(start))?; - r.seek(SeekFrom::Current(DEFAULT_MESSAGE_SIZE as i64))?; + + let mut vol = None; + let mut elv = None; + let mut rad = None; + let mut datablocks = HashMap::new(); + + for pointer in db_pointers { + r.seek(SeekFrom::Start(m31_start + pointer as u64))?; + let d: DataBlock = unpack_structure(&mut r)?; + r.seek(SeekFrom::Current(-4))?; + + match d.data_name.as_str().as_str() { + "VOL" => { + vol = Some(unpack_structure(&mut r)?); + }, + "ELV" => { + elv = Some(unpack_structure(&mut r)?); + }, + "RAD" => { + rad = Some(unpack_structure(&mut r)?); + }, + + "REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => { + let moment: GenericDataMoment = unpack_structure(&mut r)?; + let ldm = moment.number_data_moment_gates as usize * moment.data_word_size as usize / 8; + + let mut data = vec![0u8; ldm]; + r.read_exact(&mut data)?; + datablocks.insert(d.data_name.as_str(), (moment, data)); + }, + + _ => panic!("unknown data block type") + } + } + + Message::Message31( + hdr, + Message31 { + header: m31h, + volume: vol.ok_or(Ar2vError::Message31MissingRequiredField)?, + elevation: elv.ok_or(Ar2vError::Message31MissingRequiredField)?, + radial: rad.ok_or(Ar2vError::Message31MissingRequiredField)?, + datablocks, + } + ) } else { - r.seek(SeekFrom::Start(start))?; - r.seek(SeekFrom::Current((DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64))?; - } + let mut data = vec![0u8; msg_size]; + r.read_exact(&mut data)?; + + if msg_size < DEFAULT_MESSAGE_SIZE { + r.seek(SeekFrom::Start(start))?; + r.seek(SeekFrom::Current(DEFAULT_MESSAGE_SIZE as i64))?; + } else { + r.seek(SeekFrom::Start(start))?; + r.seek(SeekFrom::Current((DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64))?; + } + + Message::OtherMessage(hdr, data) + }; + + msgs.push(message); } - } + + Ok(msgs) + }) + .collect(); + + let mut messages = vec![]; + + for r in rs { + messages.append(&mut r?); } debug!("extracted raw packet data"); drop(_ldm_extract_span); - debug!("processed {} messages", message_count); + debug!(msgs=messages.len(), "extracted messages"); + + let scan_process_span = span!(Level::DEBUG, "process_scan"); + let _scan_process_span = scan_process_span.enter(); + + // process into a scan + let mut scan = Scan { + elevations: Default::default(), + }; + messages.iter() + .for_each(|u| { + if let Message::Message31(_hdr, m31) = u { + let elevation_num = m31.header.elevation_number as usize; + let azimuth_num = m31.header.azimuth_number as usize; + + let elevation = scan.elevations.entry(elevation_num) + .or_insert_with(|| { Elevation { + elevation_number: elevation_num, + elevation_angle: m31.header.elevation_angle as f64, + radials: Default::default(), + }}); + + let radial = elevation.radials.entry(azimuth_num) + .or_insert_with(|| { Radial { + radial_number: azimuth_num, + azimuth_angle: m31.header.azimuth_angle as f64, + products: Default::default(), + }}); + + for (product, data) in m31.datablocks.iter() { + // scale the data + let chunks = data.1.chunks(data.0.data_word_size as usize / 8); + + radial.products.insert(product.clone(), MomentaryProduct { + start_range_meters: data.0.data_moment_range as isize, + data_spacing_meters: data.0.data_moment_range_sample_interval as isize, + data: chunks.map(|u| { + let mut result = [0u8; 8]; + result[..u.len()].copy_from_slice(u); + usize::from_be_bytes(result) + }).map(|u| { + if u == 0 { + Gate::BelowThreshold + } else if u == 1 { + Gate::RangeFolded + } else if data.0.scale == 0.0 { + Gate::Data(u as f64) + } else { + Gate::Data((u as f64) - data.0.offset as f64 / data.0.scale as f64) + } + + }).collect(), + }); + } + } + }); + + Ok(Self { volume_header: vhr }) } +} + +fn read_record(r: &mut R, size: usize) -> Result, Ar2vError> { + let mut compressed_data = vec![0u8; size]; + r.read_exact(&mut compressed_data)?; + Ok(compressed_data) +} +fn decompress_record(compressed_data: &[u8]) -> Result>, Ar2vError> { + let mut bz2_reader = BzDecoder::new(compressed_data); + + let mut buf = vec![]; + bz2_reader.read_to_end(&mut buf)?; + + Ok(Cursor::new(buf)) } \ No newline at end of file diff --git a/crates/ar2/src/parse/scan.rs b/crates/ar2/src/parse/scan.rs new file mode 100644 index 0000000..4a9a32b --- /dev/null +++ b/crates/ar2/src/parse/scan.rs @@ -0,0 +1,27 @@ +use rustc_hash::FxHashMap; + +pub struct Scan { + pub elevations: FxHashMap +} +pub struct Elevation { + pub elevation_number: usize, + pub elevation_angle: f64, + + pub radials: FxHashMap +} +pub struct Radial { + pub radial_number: usize, + pub azimuth_angle: f64, + + pub products: FxHashMap +} +pub struct MomentaryProduct { + pub start_range_meters: isize, + pub data_spacing_meters: isize, + pub data: Vec +} +pub enum Gate { + BelowThreshold, + RangeFolded, + Data(f64) +} \ No newline at end of file diff --git a/crates/ar2/src/parse/types.rs b/crates/ar2/src/parse/types.rs index b0d182d..0d7ae67 100644 --- a/crates/ar2/src/parse/types.rs +++ b/crates/ar2/src/parse/types.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; use bincode::Decode; use crate::parse::util::ExactLengthString; #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct VolumeHeaderRecord { pub file_name: ExactLengthString<12>, pub modified_julian_date: u32, @@ -10,6 +12,7 @@ pub struct VolumeHeaderRecord { } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct MessageHeader { pub message_size: u16, pub rda_redundant_channel: u8, @@ -22,6 +25,7 @@ pub struct MessageHeader { } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct Message31Header { pub radar_identifier: ExactLengthString<4>, pub collection_time: u32, @@ -42,12 +46,14 @@ pub struct Message31Header { } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct DataBlock { pub data_block_type: ExactLengthString<1>, pub data_name: ExactLengthString<3> } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct VolumeData { pub data_block_type: ExactLengthString<1>, pub data_name: ExactLengthString<3>, @@ -67,6 +73,7 @@ pub struct VolumeData { pub processing_status: u16 } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct ElevationData { pub data_block_type: ExactLengthString<1>, pub data_name: ExactLengthString<3>, @@ -76,6 +83,7 @@ pub struct ElevationData { } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct RadialData { pub data_block_type: ExactLengthString<1>, pub data_name: ExactLengthString<3>, @@ -90,6 +98,7 @@ pub struct RadialData { } #[derive(Decode, Debug)] +#[allow(dead_code)] pub struct GenericDataMoment { pub data_block_type: ExactLengthString<1>, pub data_name: ExactLengthString<3>, @@ -103,4 +112,18 @@ pub struct GenericDataMoment { pub data_word_size: u8, pub scale: f32, pub offset: f32 +} +#[allow(dead_code)] +pub struct Message31 { + pub header: Message31Header, + pub volume: VolumeData, + pub elevation: ElevationData, + pub radial: RadialData, + pub datablocks: HashMap)> +} + +#[allow(dead_code)] +pub enum Message { + Message31(MessageHeader, Message31), + OtherMessage(MessageHeader, Vec) } \ No newline at end of file diff --git a/crates/ar2/src/parse/util.rs b/crates/ar2/src/parse/util.rs index 6854f38..f4eca28 100644 --- a/crates/ar2/src/parse/util.rs +++ b/crates/ar2/src/parse/util.rs @@ -10,23 +10,6 @@ pub(crate) fn unpack_structure, R: Read>(r: &mut R) -> Result(&mut self) -> std::io::Result<[u8; N]>; - fn read_n_buf(&mut self, n: usize) -> std::io::Result>; -} -impl ReadExt for T where T: Read { - fn read_n(&mut self) -> std::io::Result<[u8; N]> { - let mut buf = [0u8; N]; - self.read_exact(&mut buf)?; - Ok(buf) - } - fn read_n_buf(&mut self, n: usize) -> std::io::Result> { - let mut buf = Vec::with_capacity(n); - self.read_exact(&mut buf)?; - Ok(buf) - } -} - #[repr(transparent)] #[derive(Decode)] pub struct ExactLengthString([u8; N]); diff --git a/crates/ar2/src/sites/wsr88d.rs b/crates/ar2/src/sites/wsr88d.rs index c2ee878..24580d6 100644 --- a/crates/ar2/src/sites/wsr88d.rs +++ b/crates/ar2/src/sites/wsr88d.rs @@ -70,47 +70,3 @@ where pub static SITES: LazyLock = LazyLock::new(|| toml::from_str(include_str!("wsr88d.toml")).unwrap()); - -#[cfg(test)] -mod tests { - use crate::sites::wsr88d::SITES; - - const A: f64 = 6_378.1370 * 1000.0; // m - const B: f64 = 6_356.7523 * 1000.0; // m - - #[test] - fn azimuth_configurations() { - let radar = SITES.sites.get("KCRP").unwrap(); - let radar_theta = radar.long; - let radar_phi = radar.lat; - let radar_r = ((A.powi(2) * radar_phi.cos()).powi(2) - + (B.powi(2) * radar_phi.sin()).powi(2) / (A * radar_phi.cos()).powi(2) - + (B * radar_phi.sin()).powi(2)) - .sqrt(); - - let radar_x = radar_r * radar_theta.cos() * radar_phi.sin(); - let radar_y = radar_r * radar_theta.sin() * radar_phi.sin(); - let radar_z = radar_r * radar_theta.cos(); - - let measurement_theta = radar_theta + 0.0001; - let measurement_phi = radar_phi; - let measurement_r = radar_r; - - let measurement_x = measurement_r * measurement_theta.cos() * measurement_phi.sin(); - let measurement_y = measurement_r * measurement_theta.sin() * measurement_phi.sin(); - let measurement_z = measurement_r * measurement_theta.cos(); - - let radar_local_x = measurement_x - radar_x; - let radar_local_y = measurement_y - radar_y; - let radar_local_z = measurement_z - radar_z; - - let radar_local_r = - (radar_local_x.powi(2) + radar_local_y.powi(2) + radar_local_z.powi(2)).sqrt(); - let radar_local_theta = (radar_local_y / radar_local_x).atan(); - let radar_local_phi = (radar_local_z / radar_local_r).acos(); - - let azimuth = radar_local_theta; - let elevation = radar_local_phi; - let distance = radar_local_r; - } -}