diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index c7c4b90..b81758e 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -7,16 +7,19 @@
-
-
+
+
+
+
+
-
+
@@ -24,7 +27,7 @@
-
+
1748922664295
-
+
+
+ 1749001700392
+
+
+
+ 1749001700392
+
+
@@ -252,6 +263,7 @@
-
+
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index d857288..c13bfbd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3696,7 +3696,6 @@ dependencies = [
"nexrad-data",
"nexrad-decode",
"rayon",
- "rustc-hash",
"serde",
"thiserror 2.0.12",
"toml",
diff --git a/crates/ar2/Cargo.toml b/crates/ar2/Cargo.toml
index bec173b..8001913 100644
--- a/crates/ar2/Cargo.toml
+++ b/crates/ar2/Cargo.toml
@@ -14,7 +14,6 @@ thiserror = "2"
bincode = "2"
tracing = "0.1"
tracing-subscriber = "0.3"
-rustc-hash = "2"
[dev-dependencies]
criterion = { version = "0.5" }
diff --git a/crates/ar2/benches/parse_benchmark.rs b/crates/ar2/benches/parse_benchmark.rs
index b46e2e7..6c1ccb5 100644
--- a/crates/ar2/benches/parse_benchmark.rs
+++ b/crates/ar2/benches/parse_benchmark.rs
@@ -1,10 +1,15 @@
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use std::io::Cursor;
-use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use wxbox_ar2::parse::Ar2v;
-const KCRP20170825_235733_V06: (&[u8], &str) = (include_bytes!("KCRP20170825_235733_V06"), "KCRP20170825_235733_V06");
-const KGWX20250518_165333_V06: (&[u8], &str) = (include_bytes!("KGWX20250518_165333_V06"), "KGWX20250518_165333_V06");
-
+const KCRP20170825_235733_V06: (&[u8], &str) = (
+ include_bytes!("KCRP20170825_235733_V06"),
+ "KCRP20170825_235733_V06",
+);
+const KGWX20250518_165333_V06: (&[u8], &str) = (
+ include_bytes!("KGWX20250518_165333_V06"),
+ "KGWX20250518_165333_V06",
+);
fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("parse");
@@ -15,10 +20,10 @@ fn criterion_benchmark(c: &mut Criterion) {
test_file_bytes,
|b, bytes| {
b.iter(|| {
- let mut r= Cursor::new(bytes.to_vec());
+ let mut r = Cursor::new(bytes.to_vec());
Ar2v::read(&mut r).unwrap();
});
- }
+ },
);
}
@@ -26,4 +31,4 @@ fn criterion_benchmark(c: &mut Criterion) {
}
criterion_group!(benches, criterion_benchmark);
-criterion_main!(benches);
\ No newline at end of file
+criterion_main!(benches);
diff --git a/crates/ar2/flamegraph.svg b/crates/ar2/flamegraph.svg
deleted file mode 100644
index 7ce03f7..0000000
--- a/crates/ar2/flamegraph.svg
+++ /dev/null
@@ -1,491 +0,0 @@
-
\ No newline at end of file
diff --git a/crates/ar2/src/lib.rs b/crates/ar2/src/lib.rs
index 1a09ca3..7cad846 100644
--- a/crates/ar2/src/lib.rs
+++ b/crates/ar2/src/lib.rs
@@ -8,8 +8,8 @@ use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Formatter};
-pub mod sites;
pub mod parse;
+pub mod sites;
#[derive(Serialize, Deserialize, Clone)]
pub struct Scan {
diff --git a/crates/ar2/src/main.rs b/crates/ar2/src/main.rs
index 8022994..4c88765 100644
--- a/crates/ar2/src/main.rs
+++ b/crates/ar2/src/main.rs
@@ -1,4 +1,4 @@
-use std::{env};
+use std::env;
use std::fs::File;
use tracing::Level;
use tracing_subscriber::fmt::format::FmtSpan;
diff --git a/crates/ar2/src/parse/error.rs b/crates/ar2/src/parse/error.rs
index 797c7a8..2d2351a 100644
--- a/crates/ar2/src/parse/error.rs
+++ b/crates/ar2/src/parse/error.rs
@@ -10,5 +10,5 @@ pub enum Ar2vError {
#[error("unknown compression record")]
UnknownCompressionRecord,
#[error("m31 missing required field")]
- Message31MissingRequiredField
-}
\ No newline at end of file
+ Message31MissingRequiredField,
+}
diff --git a/crates/ar2/src/parse/mod.rs b/crates/ar2/src/parse/mod.rs
index aac1e41..e6c447b 100644
--- a/crates/ar2/src/parse/mod.rs
+++ b/crates/ar2/src/parse/mod.rs
@@ -1,24 +1,26 @@
-use std::collections::HashMap;
-use std::fmt::Debug;
-use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom};
+use crate::parse::error::Ar2vError;
+use crate::parse::scan::{Elevation, Gate, MomentaryProduct, Radial, Scan};
+use crate::parse::types::{
+ DataBlock, GenericDataMoment, Message, Message31, Message31Header, MessageHeader,
+ VolumeHeaderRecord,
+};
+use crate::parse::util::unpack_structure;
use bincode::error::DecodeError;
use bzip2::read::BzDecoder;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
-use tracing::{debug, Level, span};
-use crate::parse::error::Ar2vError;
-use crate::parse::scan::{Elevation, Gate, MomentaryProduct, Radial, Scan};
-use crate::parse::types::{DataBlock, GenericDataMoment, Message, Message31, Message31Header, MessageHeader, VolumeHeaderRecord};
-use crate::parse::util::unpack_structure;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom};
+use tracing::{Level, debug, span};
-pub mod types;
pub mod error;
-pub mod util;
pub mod scan;
+pub mod types;
+pub mod util;
const DEFAULT_MESSAGE_SIZE: usize = 2432;
const MESSAGE_BODY_SIZE: usize = DEFAULT_MESSAGE_SIZE - 12 - 16;
-
#[derive(Debug)]
pub struct Ar2v {
pub volume_header: VolumeHeaderRecord,
@@ -43,12 +45,12 @@ impl Ar2v {
Err(e) => match e {
DecodeError::UnexpectedEnd { .. } => {
break;
- },
+ }
DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => {
break;
}
- _ => return Err(e.into())
- }
+ _ => return Err(e.into()),
+ },
};
let size: usize = size.unsigned_abs() as usize;
@@ -58,7 +60,8 @@ impl Ar2v {
records.push(record_content);
}
- let rs: Vec, Ar2vError>> = records.par_iter()
+ let rs: Vec, Ar2vError>> = records
+ .par_iter()
.map(|record_content| {
let mut r = decompress_record(record_content)?;
@@ -74,17 +77,20 @@ impl Ar2v {
Err(e) => match e {
DecodeError::UnexpectedEnd { .. } => {
break;
- },
- DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => {
+ }
+ DecodeError::Io { inner, .. }
+ if inner.kind() == ErrorKind::UnexpectedEof =>
+ {
break;
}
- _ => return Err(e.into())
- }
+ _ => return Err(e.into()),
+ },
};
let mut msg_size = hdr.message_size as usize;
if msg_size == 65535 {
- msg_size = ((hdr.num_message_segments as usize) << 16) | hdr.message_segment_num as usize;
+ msg_size = ((hdr.num_message_segments as usize) << 16)
+ | hdr.message_segment_num as usize;
}
if hdr.message_type == 0 {
@@ -100,9 +106,17 @@ impl Ar2v {
}
if hdr.num_message_segments > 100 {
eprintln!("{:?}", hdr);
- panic!("very large number of segments {}...", hdr.num_message_segments);
+ panic!(
+ "very large number of segments {}...",
+ hdr.num_message_segments
+ );
}
- if ![1_u8,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,31,32,33].contains(&hdr.message_type) {
+ if ![
+ 1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 31, 32,
+ 33,
+ ]
+ .contains(&hdr.message_type)
+ {
panic!("invalid message type");
}
@@ -128,24 +142,26 @@ impl Ar2v {
match d.data_name.as_str().as_str() {
"VOL" => {
vol = Some(unpack_structure(&mut r)?);
- },
+ }
"ELV" => {
elv = Some(unpack_structure(&mut r)?);
- },
+ }
"RAD" => {
rad = Some(unpack_structure(&mut r)?);
- },
+ }
"REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => {
let moment: GenericDataMoment = unpack_structure(&mut r)?;
- let ldm = moment.number_data_moment_gates as usize * moment.data_word_size as usize / 8;
+ let ldm = moment.number_data_moment_gates as usize
+ * moment.data_word_size as usize
+ / 8;
let mut data = vec![0u8; ldm];
r.read_exact(&mut data)?;
datablocks.insert(d.data_name.as_str(), (moment, data));
- },
+ }
- _ => panic!("unknown data block type")
+ _ => panic!("unknown data block type"),
}
}
@@ -157,7 +173,7 @@ impl Ar2v {
elevation: elv.ok_or(Ar2vError::Message31MissingRequiredField)?,
radial: rad.ok_or(Ar2vError::Message31MissingRequiredField)?,
datablocks,
- }
+ },
)
} else {
let mut data = vec![0u8; msg_size];
@@ -168,7 +184,9 @@ impl Ar2v {
r.seek(SeekFrom::Current(DEFAULT_MESSAGE_SIZE as i64))?;
} else {
r.seek(SeekFrom::Start(start))?;
- r.seek(SeekFrom::Current((DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64))?;
+ r.seek(SeekFrom::Current(
+ (DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64,
+ ))?;
}
Message::OtherMessage(hdr, data)
@@ -189,7 +207,7 @@ impl Ar2v {
debug!("extracted raw packet data");
drop(_ldm_extract_span);
- debug!(msgs=messages.len(), "extracted messages");
+ debug!(msgs = messages.len(), "extracted messages");
let scan_process_span = span!(Level::DEBUG, "process_scan");
let _scan_process_span = scan_process_span.enter();
@@ -198,59 +216,93 @@ impl Ar2v {
let mut scan = Scan {
elevations: Default::default(),
};
- messages.iter()
- .for_each(|u| {
+
+ let (tx, rx) = std::sync::mpsc::channel::<(usize, f64, usize, f64, String, MomentaryProduct)>();
+
+ std::thread::spawn(move || {
+ messages.par_iter().for_each(|u| {
if let Message::Message31(_hdr, m31) = u {
- let elevation_num = m31.header.elevation_number as usize;
- let azimuth_num = m31.header.azimuth_number as usize;
+ let elevation_num = m31.header.elevation_number as usize;
+ let azimuth_num = m31.header.azimuth_number as usize;
- let elevation = scan.elevations.entry(elevation_num)
- .or_insert_with(|| { Elevation {
- elevation_number: elevation_num,
- elevation_angle: m31.header.elevation_angle as f64,
- radials: Default::default(),
- }});
+ for (product, data) in &m31.datablocks {
+ match product.as_str() {
+ "REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => (),
+ _ => continue,
+ }
- let radial = elevation.radials.entry(azimuth_num)
- .or_insert_with(|| { Radial {
- radial_number: azimuth_num,
- azimuth_angle: m31.header.azimuth_angle as f64,
- products: Default::default(),
- }});
-
- for (product, data) in m31.datablocks.iter() {
- // scale the data
- let chunks = data.1.chunks(data.0.data_word_size as usize / 8);
-
- radial.products.insert(product.clone(), MomentaryProduct {
- start_range_meters: data.0.data_moment_range as isize,
- data_spacing_meters: data.0.data_moment_range_sample_interval as isize,
- data: chunks.map(|u| {
- let mut result = [0u8; 8];
- result[..u.len()].copy_from_slice(u);
- usize::from_be_bytes(result)
- }).map(|u| {
+ // scale the data
+ let chunks = data.1.chunks(data.0.data_word_size as usize / 8);
+ let mp = MomentaryProduct {
+ start_range_meters: data.0.data_moment_range as isize,
+ data_spacing_meters: data.0.data_moment_range_sample_interval as isize,
+ data: chunks
+ .map(|u| {
+ let mut result = [0u8; 8];
+ result[..u.len()].copy_from_slice(u);
+ usize::from_be_bytes(result)
+ })
+ .map(|u| {
if u == 0 {
Gate::BelowThreshold
} else if u == 1 {
Gate::RangeFolded
} else if data.0.scale == 0.0 {
- Gate::Data(u as f64)
- } else {
- Gate::Data((u as f64) - data.0.offset as f64 / data.0.scale as f64)
- }
-
- }).collect(),
- });
- }
+ Gate::Data(u as f64)
+ } else {
+ Gate::Data(
+ (u as f64) - data.0.offset as f64 / data.0.scale as f64,
+ )
+ }
+ })
+ .collect(),
+ };
+ tx.send((elevation_num, m31.header.elevation_angle as f64, azimuth_num, m31.header.azimuth_angle as f64, product.clone(), mp)).unwrap();
}
+ }
});
+ drop(tx);
+ });
+ while let Ok((elevation, elevation_angle, azimuth, azimuth_angle, product, mp)) = rx.recv() {
+ let elevation = scan
+ .elevations
+ .entry(elevation)
+ .or_insert_with(|| Elevation {
+ elevation_number: elevation,
+ elevation_angle,
+ radials: Default::default(),
+ });
- Ok(Self {
- volume_header: vhr
- })
+ let radial = elevation
+ .radials
+ .entry(azimuth)
+ .or_insert_with(|| Radial {
+ radial_number: azimuth,
+ azimuth_angle,
+ refl: None,
+ vel: None,
+ cfp: None,
+ sw: None,
+ zdr: None,
+ phi: None,
+ rho: None,
+ });
+
+ match product.as_str() {
+ "REF" => { radial.refl = Some(mp); },
+ "VEL" => { radial.vel = Some(mp); },
+ "SW " => { radial.sw = Some(mp); },
+ "CFP" => { radial.cfp = Some(mp); },
+ "ZDR" => { radial.zdr = Some(mp); },
+ "PHI" => { radial.phi = Some(mp); },
+ "RHO" => { radial.rho = Some(mp); },
+ _ => panic!()
+ };
+ }
+
+ Ok(Self { volume_header: vhr })
}
}
@@ -266,4 +318,4 @@ fn decompress_record(compressed_data: &[u8]) -> Result>, Ar2vErro
bz2_reader.read_to_end(&mut buf)?;
Ok(Cursor::new(buf))
-}
\ No newline at end of file
+}
diff --git a/crates/ar2/src/parse/scan.rs b/crates/ar2/src/parse/scan.rs
index 4a9a32b..edd7889 100644
--- a/crates/ar2/src/parse/scan.rs
+++ b/crates/ar2/src/parse/scan.rs
@@ -1,27 +1,33 @@
-use rustc_hash::FxHashMap;
+use std::collections::BTreeMap;
pub struct Scan {
- pub elevations: FxHashMap
+ pub elevations: BTreeMap,
}
pub struct Elevation {
pub elevation_number: usize,
pub elevation_angle: f64,
- pub radials: FxHashMap
+ pub radials: BTreeMap,
}
pub struct Radial {
pub radial_number: usize,
pub azimuth_angle: f64,
- pub products: FxHashMap
+ pub refl: Option,
+ pub vel: Option,
+ pub cfp: Option,
+ pub sw: Option,
+ pub zdr: Option,
+ pub phi: Option,
+ pub rho: Option,
}
pub struct MomentaryProduct {
pub start_range_meters: isize,
pub data_spacing_meters: isize,
- pub data: Vec
+ pub data: Vec,
}
pub enum Gate {
BelowThreshold,
RangeFolded,
- Data(f64)
-}
\ No newline at end of file
+ Data(f64),
+}
diff --git a/crates/ar2/src/parse/types.rs b/crates/ar2/src/parse/types.rs
index 0d7ae67..b59bf1d 100644
--- a/crates/ar2/src/parse/types.rs
+++ b/crates/ar2/src/parse/types.rs
@@ -1,6 +1,6 @@
-use std::collections::HashMap;
-use bincode::Decode;
use crate::parse::util::ExactLengthString;
+use bincode::Decode;
+use std::collections::HashMap;
#[derive(Decode, Debug)]
#[allow(dead_code)]
@@ -21,7 +21,7 @@ pub struct MessageHeader {
pub julian_date: u16,
pub millis_of_day: u32,
pub num_message_segments: u16,
- pub message_segment_num: u16
+ pub message_segment_num: u16,
}
#[derive(Decode, Debug)]
@@ -42,14 +42,14 @@ pub struct Message31Header {
pub elevation_angle: f32,
pub radial_spot_blanking_status: u8,
pub azimuth_indexing_mode: u8,
- pub data_block_count: u16
+ pub data_block_count: u16,
}
#[derive(Decode, Debug)]
#[allow(dead_code)]
pub struct DataBlock {
pub data_block_type: ExactLengthString<1>,
- pub data_name: ExactLengthString<3>
+ pub data_name: ExactLengthString<3>,
}
#[derive(Decode, Debug)]
@@ -70,7 +70,7 @@ pub struct VolumeData {
pub system_differential_reflectivity: f32,
pub initial_system_differential_phase: f32,
pub vcp_number: u16,
- pub processing_status: u16
+ pub processing_status: u16,
}
#[derive(Decode, Debug)]
#[allow(dead_code)]
@@ -94,7 +94,7 @@ pub struct RadialData {
pub nyquist_velocity: u16,
pub radial_flags: u16,
pub calib_const_horz_chan: f32,
- pub calib_const_vert_chan: f32
+ pub calib_const_vert_chan: f32,
}
#[derive(Decode, Debug)]
@@ -111,7 +111,7 @@ pub struct GenericDataMoment {
pub control_flags: u8,
pub data_word_size: u8,
pub scale: f32,
- pub offset: f32
+ pub offset: f32,
}
#[allow(dead_code)]
pub struct Message31 {
@@ -119,11 +119,11 @@ pub struct Message31 {
pub volume: VolumeData,
pub elevation: ElevationData,
pub radial: RadialData,
- pub datablocks: HashMap)>
+ pub datablocks: HashMap)>,
}
#[allow(dead_code)]
pub enum Message {
Message31(MessageHeader, Message31),
- OtherMessage(MessageHeader, Vec)
-}
\ No newline at end of file
+ OtherMessage(MessageHeader, Vec),
+}
diff --git a/crates/ar2/src/parse/util.rs b/crates/ar2/src/parse/util.rs
index f4eca28..4fffff2 100644
--- a/crates/ar2/src/parse/util.rs
+++ b/crates/ar2/src/parse/util.rs
@@ -1,12 +1,14 @@
-use std::fmt::{Debug, Display, Formatter};
-use std::io::Read;
use bincode::Decode;
use bincode::error::DecodeError;
+use std::fmt::{Debug, Display, Formatter};
+use std::io::Read;
pub(crate) fn unpack_structure, R: Read>(r: &mut R) -> Result {
- bincode::decode_from_std_read(r, bincode::config::standard()
- .with_big_endian()
- .with_fixed_int_encoding()
+ bincode::decode_from_std_read(
+ r,
+ bincode::config::standard()
+ .with_big_endian()
+ .with_fixed_int_encoding(),
)
}
@@ -27,4 +29,4 @@ impl Debug for ExactLengthString {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
-}
\ No newline at end of file
+}