feat: optimizing wxbox-ar2
This commit is contained in:
parent
ac4b794c14
commit
26a2212f70
12 changed files with 190 additions and 606 deletions
24
.idea/workspace.xml
generated
24
.idea/workspace.xml
generated
|
@ -7,16 +7,19 @@
|
||||||
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml" />
|
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml" />
|
||||||
</component>
|
</component>
|
||||||
<component name="ChangeListManager">
|
<component name="ChangeListManager">
|
||||||
<list default="true" id="2d855648-9644-469a-afa2-59beb52bb1d6" name="Changes" comment="bug: try to fix some multimap weirdness">
|
<list default="true" id="2d855648-9644-469a-afa2-59beb52bb1d6" name="Changes" comment="feat: wxbox-ar2 round two">
|
||||||
<change afterPath="$PROJECT_DIR$/crates/ar2/src/parse/scan.rs" afterDir="false" />
|
|
||||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/Cargo.lock" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.lock" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/Cargo.lock" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.lock" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/Cargo.toml" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/Cargo.toml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/Cargo.toml" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/Cargo.toml" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/benches/parse_benchmark.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/benches/parse_benchmark.rs" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/benches/parse_benchmark.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/benches/parse_benchmark.rs" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/crates/ar2/flamegraph.svg" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/flamegraph.svg" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/lib.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/lib.rs" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/main.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/main.rs" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/error.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/error.rs" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/error.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/error.rs" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/mod.rs" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/mod.rs" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/scan.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/scan.rs" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/types.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/types.rs" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/types.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/types.rs" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/crates/ar2/src/sites/wsr88d.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/sites/wsr88d.rs" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/crates/ar2/src/parse/util.rs" beforeDir="false" afterPath="$PROJECT_DIR$/crates/ar2/src/parse/util.rs" afterDir="false" />
|
||||||
</list>
|
</list>
|
||||||
<option name="SHOW_DIALOG" value="false" />
|
<option name="SHOW_DIALOG" value="false" />
|
||||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||||
|
@ -24,7 +27,7 @@
|
||||||
<option name="LAST_RESOLUTION" value="IGNORE" />
|
<option name="LAST_RESOLUTION" value="IGNORE" />
|
||||||
</component>
|
</component>
|
||||||
<component name="DarkyenusTimeTracker">
|
<component name="DarkyenusTimeTracker">
|
||||||
<option name="totalTimeSeconds" value="26071" />
|
<option name="totalTimeSeconds" value="26794" />
|
||||||
</component>
|
</component>
|
||||||
<component name="FileTemplateManagerImpl">
|
<component name="FileTemplateManagerImpl">
|
||||||
<option name="RECENT_TEMPLATES">
|
<option name="RECENT_TEMPLATES">
|
||||||
|
@ -234,7 +237,15 @@
|
||||||
<option name="project" value="LOCAL" />
|
<option name="project" value="LOCAL" />
|
||||||
<updated>1748922664295</updated>
|
<updated>1748922664295</updated>
|
||||||
</task>
|
</task>
|
||||||
<option name="localTasksCounter" value="12" />
|
<task id="LOCAL-00012" summary="feat: wxbox-ar2 round two">
|
||||||
|
<option name="closed" value="true" />
|
||||||
|
<created>1749001700392</created>
|
||||||
|
<option name="number" value="00012" />
|
||||||
|
<option name="presentableId" value="LOCAL-00012" />
|
||||||
|
<option name="project" value="LOCAL" />
|
||||||
|
<updated>1749001700392</updated>
|
||||||
|
</task>
|
||||||
|
<option name="localTasksCounter" value="13" />
|
||||||
<servers />
|
<servers />
|
||||||
</component>
|
</component>
|
||||||
<component name="TypeScriptGeneratedFilesManager">
|
<component name="TypeScriptGeneratedFilesManager">
|
||||||
|
@ -252,6 +263,7 @@
|
||||||
<MESSAGE value="chore: prep for migration" />
|
<MESSAGE value="chore: prep for migration" />
|
||||||
<MESSAGE value="feat: new ui" />
|
<MESSAGE value="feat: new ui" />
|
||||||
<MESSAGE value="bug: try to fix some multimap weirdness" />
|
<MESSAGE value="bug: try to fix some multimap weirdness" />
|
||||||
<option name="LAST_COMMIT_MESSAGE" value="bug: try to fix some multimap weirdness" />
|
<MESSAGE value="feat: wxbox-ar2 round two" />
|
||||||
|
<option name="LAST_COMMIT_MESSAGE" value="feat: wxbox-ar2 round two" />
|
||||||
</component>
|
</component>
|
||||||
</project>
|
</project>
|
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -3696,7 +3696,6 @@ dependencies = [
|
||||||
"nexrad-data",
|
"nexrad-data",
|
||||||
"nexrad-decode",
|
"nexrad-decode",
|
||||||
"rayon",
|
"rayon",
|
||||||
"rustc-hash",
|
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror 2.0.12",
|
"thiserror 2.0.12",
|
||||||
"toml",
|
"toml",
|
||||||
|
|
|
@ -14,7 +14,6 @@ thiserror = "2"
|
||||||
bincode = "2"
|
bincode = "2"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
rustc-hash = "2"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = { version = "0.5" }
|
criterion = { version = "0.5" }
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
|
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
|
||||||
use wxbox_ar2::parse::Ar2v;
|
use wxbox_ar2::parse::Ar2v;
|
||||||
|
|
||||||
const KCRP20170825_235733_V06: (&[u8], &str) = (include_bytes!("KCRP20170825_235733_V06"), "KCRP20170825_235733_V06");
|
const KCRP20170825_235733_V06: (&[u8], &str) = (
|
||||||
const KGWX20250518_165333_V06: (&[u8], &str) = (include_bytes!("KGWX20250518_165333_V06"), "KGWX20250518_165333_V06");
|
include_bytes!("KCRP20170825_235733_V06"),
|
||||||
|
"KCRP20170825_235733_V06",
|
||||||
|
);
|
||||||
|
const KGWX20250518_165333_V06: (&[u8], &str) = (
|
||||||
|
include_bytes!("KGWX20250518_165333_V06"),
|
||||||
|
"KGWX20250518_165333_V06",
|
||||||
|
);
|
||||||
|
|
||||||
fn criterion_benchmark(c: &mut Criterion) {
|
fn criterion_benchmark(c: &mut Criterion) {
|
||||||
let mut group = c.benchmark_group("parse");
|
let mut group = c.benchmark_group("parse");
|
||||||
|
@ -15,10 +20,10 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||||
test_file_bytes,
|
test_file_bytes,
|
||||||
|b, bytes| {
|
|b, bytes| {
|
||||||
b.iter(|| {
|
b.iter(|| {
|
||||||
let mut r= Cursor::new(bytes.to_vec());
|
let mut r = Cursor::new(bytes.to_vec());
|
||||||
Ar2v::read(&mut r).unwrap();
|
Ar2v::read(&mut r).unwrap();
|
||||||
});
|
});
|
||||||
}
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,4 +31,4 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||||
}
|
}
|
||||||
|
|
||||||
criterion_group!(benches, criterion_benchmark);
|
criterion_group!(benches, criterion_benchmark);
|
||||||
criterion_main!(benches);
|
criterion_main!(benches);
|
||||||
|
|
File diff suppressed because one or more lines are too long
Before Width: | Height: | Size: 566 KiB |
|
@ -8,8 +8,8 @@ use rayon::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt::{Debug, Formatter};
|
use std::fmt::{Debug, Formatter};
|
||||||
|
|
||||||
pub mod sites;
|
|
||||||
pub mod parse;
|
pub mod parse;
|
||||||
|
pub mod sites;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub struct Scan {
|
pub struct Scan {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::{env};
|
use std::env;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
use tracing_subscriber::fmt::format::FmtSpan;
|
use tracing_subscriber::fmt::format::FmtSpan;
|
||||||
|
|
|
@ -10,5 +10,5 @@ pub enum Ar2vError {
|
||||||
#[error("unknown compression record")]
|
#[error("unknown compression record")]
|
||||||
UnknownCompressionRecord,
|
UnknownCompressionRecord,
|
||||||
#[error("m31 missing required field")]
|
#[error("m31 missing required field")]
|
||||||
Message31MissingRequiredField
|
Message31MissingRequiredField,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,26 @@
|
||||||
use std::collections::HashMap;
|
use crate::parse::error::Ar2vError;
|
||||||
use std::fmt::Debug;
|
use crate::parse::scan::{Elevation, Gate, MomentaryProduct, Radial, Scan};
|
||||||
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom};
|
use crate::parse::types::{
|
||||||
|
DataBlock, GenericDataMoment, Message, Message31, Message31Header, MessageHeader,
|
||||||
|
VolumeHeaderRecord,
|
||||||
|
};
|
||||||
|
use crate::parse::util::unpack_structure;
|
||||||
use bincode::error::DecodeError;
|
use bincode::error::DecodeError;
|
||||||
use bzip2::read::BzDecoder;
|
use bzip2::read::BzDecoder;
|
||||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||||
use tracing::{debug, Level, span};
|
use std::collections::HashMap;
|
||||||
use crate::parse::error::Ar2vError;
|
use std::fmt::Debug;
|
||||||
use crate::parse::scan::{Elevation, Gate, MomentaryProduct, Radial, Scan};
|
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom};
|
||||||
use crate::parse::types::{DataBlock, GenericDataMoment, Message, Message31, Message31Header, MessageHeader, VolumeHeaderRecord};
|
use tracing::{Level, debug, span};
|
||||||
use crate::parse::util::unpack_structure;
|
|
||||||
|
|
||||||
pub mod types;
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod util;
|
|
||||||
pub mod scan;
|
pub mod scan;
|
||||||
|
pub mod types;
|
||||||
|
pub mod util;
|
||||||
|
|
||||||
const DEFAULT_MESSAGE_SIZE: usize = 2432;
|
const DEFAULT_MESSAGE_SIZE: usize = 2432;
|
||||||
const MESSAGE_BODY_SIZE: usize = DEFAULT_MESSAGE_SIZE - 12 - 16;
|
const MESSAGE_BODY_SIZE: usize = DEFAULT_MESSAGE_SIZE - 12 - 16;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Ar2v {
|
pub struct Ar2v {
|
||||||
pub volume_header: VolumeHeaderRecord,
|
pub volume_header: VolumeHeaderRecord,
|
||||||
|
@ -43,12 +45,12 @@ impl Ar2v {
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
DecodeError::UnexpectedEnd { .. } => {
|
DecodeError::UnexpectedEnd { .. } => {
|
||||||
break;
|
break;
|
||||||
},
|
}
|
||||||
DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => {
|
DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => return Err(e.into())
|
_ => return Err(e.into()),
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let size: usize = size.unsigned_abs() as usize;
|
let size: usize = size.unsigned_abs() as usize;
|
||||||
|
@ -58,7 +60,8 @@ impl Ar2v {
|
||||||
records.push(record_content);
|
records.push(record_content);
|
||||||
}
|
}
|
||||||
|
|
||||||
let rs: Vec<Result<Vec<Message>, Ar2vError>> = records.par_iter()
|
let rs: Vec<Result<Vec<Message>, Ar2vError>> = records
|
||||||
|
.par_iter()
|
||||||
.map(|record_content| {
|
.map(|record_content| {
|
||||||
let mut r = decompress_record(record_content)?;
|
let mut r = decompress_record(record_content)?;
|
||||||
|
|
||||||
|
@ -74,17 +77,20 @@ impl Ar2v {
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
DecodeError::UnexpectedEnd { .. } => {
|
DecodeError::UnexpectedEnd { .. } => {
|
||||||
break;
|
break;
|
||||||
},
|
}
|
||||||
DecodeError::Io { inner, .. } if inner.kind() == ErrorKind::UnexpectedEof => {
|
DecodeError::Io { inner, .. }
|
||||||
|
if inner.kind() == ErrorKind::UnexpectedEof =>
|
||||||
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => return Err(e.into())
|
_ => return Err(e.into()),
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut msg_size = hdr.message_size as usize;
|
let mut msg_size = hdr.message_size as usize;
|
||||||
if msg_size == 65535 {
|
if msg_size == 65535 {
|
||||||
msg_size = ((hdr.num_message_segments as usize) << 16) | hdr.message_segment_num as usize;
|
msg_size = ((hdr.num_message_segments as usize) << 16)
|
||||||
|
| hdr.message_segment_num as usize;
|
||||||
}
|
}
|
||||||
|
|
||||||
if hdr.message_type == 0 {
|
if hdr.message_type == 0 {
|
||||||
|
@ -100,9 +106,17 @@ impl Ar2v {
|
||||||
}
|
}
|
||||||
if hdr.num_message_segments > 100 {
|
if hdr.num_message_segments > 100 {
|
||||||
eprintln!("{:?}", hdr);
|
eprintln!("{:?}", hdr);
|
||||||
panic!("very large number of segments {}...", hdr.num_message_segments);
|
panic!(
|
||||||
|
"very large number of segments {}...",
|
||||||
|
hdr.num_message_segments
|
||||||
|
);
|
||||||
}
|
}
|
||||||
if ![1_u8,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,31,32,33].contains(&hdr.message_type) {
|
if ![
|
||||||
|
1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 31, 32,
|
||||||
|
33,
|
||||||
|
]
|
||||||
|
.contains(&hdr.message_type)
|
||||||
|
{
|
||||||
panic!("invalid message type");
|
panic!("invalid message type");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,24 +142,26 @@ impl Ar2v {
|
||||||
match d.data_name.as_str().as_str() {
|
match d.data_name.as_str().as_str() {
|
||||||
"VOL" => {
|
"VOL" => {
|
||||||
vol = Some(unpack_structure(&mut r)?);
|
vol = Some(unpack_structure(&mut r)?);
|
||||||
},
|
}
|
||||||
"ELV" => {
|
"ELV" => {
|
||||||
elv = Some(unpack_structure(&mut r)?);
|
elv = Some(unpack_structure(&mut r)?);
|
||||||
},
|
}
|
||||||
"RAD" => {
|
"RAD" => {
|
||||||
rad = Some(unpack_structure(&mut r)?);
|
rad = Some(unpack_structure(&mut r)?);
|
||||||
},
|
}
|
||||||
|
|
||||||
"REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => {
|
"REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => {
|
||||||
let moment: GenericDataMoment = unpack_structure(&mut r)?;
|
let moment: GenericDataMoment = unpack_structure(&mut r)?;
|
||||||
let ldm = moment.number_data_moment_gates as usize * moment.data_word_size as usize / 8;
|
let ldm = moment.number_data_moment_gates as usize
|
||||||
|
* moment.data_word_size as usize
|
||||||
|
/ 8;
|
||||||
|
|
||||||
let mut data = vec![0u8; ldm];
|
let mut data = vec![0u8; ldm];
|
||||||
r.read_exact(&mut data)?;
|
r.read_exact(&mut data)?;
|
||||||
datablocks.insert(d.data_name.as_str(), (moment, data));
|
datablocks.insert(d.data_name.as_str(), (moment, data));
|
||||||
},
|
}
|
||||||
|
|
||||||
_ => panic!("unknown data block type")
|
_ => panic!("unknown data block type"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +173,7 @@ impl Ar2v {
|
||||||
elevation: elv.ok_or(Ar2vError::Message31MissingRequiredField)?,
|
elevation: elv.ok_or(Ar2vError::Message31MissingRequiredField)?,
|
||||||
radial: rad.ok_or(Ar2vError::Message31MissingRequiredField)?,
|
radial: rad.ok_or(Ar2vError::Message31MissingRequiredField)?,
|
||||||
datablocks,
|
datablocks,
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
let mut data = vec![0u8; msg_size];
|
let mut data = vec![0u8; msg_size];
|
||||||
|
@ -168,7 +184,9 @@ impl Ar2v {
|
||||||
r.seek(SeekFrom::Current(DEFAULT_MESSAGE_SIZE as i64))?;
|
r.seek(SeekFrom::Current(DEFAULT_MESSAGE_SIZE as i64))?;
|
||||||
} else {
|
} else {
|
||||||
r.seek(SeekFrom::Start(start))?;
|
r.seek(SeekFrom::Start(start))?;
|
||||||
r.seek(SeekFrom::Current((DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64))?;
|
r.seek(SeekFrom::Current(
|
||||||
|
(DEFAULT_MESSAGE_SIZE as i64 + msg_size as i64) as i64,
|
||||||
|
))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::OtherMessage(hdr, data)
|
Message::OtherMessage(hdr, data)
|
||||||
|
@ -189,7 +207,7 @@ impl Ar2v {
|
||||||
|
|
||||||
debug!("extracted raw packet data");
|
debug!("extracted raw packet data");
|
||||||
drop(_ldm_extract_span);
|
drop(_ldm_extract_span);
|
||||||
debug!(msgs=messages.len(), "extracted messages");
|
debug!(msgs = messages.len(), "extracted messages");
|
||||||
|
|
||||||
let scan_process_span = span!(Level::DEBUG, "process_scan");
|
let scan_process_span = span!(Level::DEBUG, "process_scan");
|
||||||
let _scan_process_span = scan_process_span.enter();
|
let _scan_process_span = scan_process_span.enter();
|
||||||
|
@ -198,59 +216,93 @@ impl Ar2v {
|
||||||
let mut scan = Scan {
|
let mut scan = Scan {
|
||||||
elevations: Default::default(),
|
elevations: Default::default(),
|
||||||
};
|
};
|
||||||
messages.iter()
|
|
||||||
.for_each(|u| {
|
let (tx, rx) = std::sync::mpsc::channel::<(usize, f64, usize, f64, String, MomentaryProduct)>();
|
||||||
|
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
messages.par_iter().for_each(|u| {
|
||||||
if let Message::Message31(_hdr, m31) = u {
|
if let Message::Message31(_hdr, m31) = u {
|
||||||
let elevation_num = m31.header.elevation_number as usize;
|
let elevation_num = m31.header.elevation_number as usize;
|
||||||
let azimuth_num = m31.header.azimuth_number as usize;
|
let azimuth_num = m31.header.azimuth_number as usize;
|
||||||
|
|
||||||
let elevation = scan.elevations.entry(elevation_num)
|
for (product, data) in &m31.datablocks {
|
||||||
.or_insert_with(|| { Elevation {
|
match product.as_str() {
|
||||||
elevation_number: elevation_num,
|
"REF" | "VEL" | "CFP" | "SW " | "ZDR" | "PHI" | "RHO" => (),
|
||||||
elevation_angle: m31.header.elevation_angle as f64,
|
_ => continue,
|
||||||
radials: Default::default(),
|
}
|
||||||
}});
|
|
||||||
|
|
||||||
let radial = elevation.radials.entry(azimuth_num)
|
// scale the data
|
||||||
.or_insert_with(|| { Radial {
|
let chunks = data.1.chunks(data.0.data_word_size as usize / 8);
|
||||||
radial_number: azimuth_num,
|
let mp = MomentaryProduct {
|
||||||
azimuth_angle: m31.header.azimuth_angle as f64,
|
start_range_meters: data.0.data_moment_range as isize,
|
||||||
products: Default::default(),
|
data_spacing_meters: data.0.data_moment_range_sample_interval as isize,
|
||||||
}});
|
data: chunks
|
||||||
|
.map(|u| {
|
||||||
for (product, data) in m31.datablocks.iter() {
|
let mut result = [0u8; 8];
|
||||||
// scale the data
|
result[..u.len()].copy_from_slice(u);
|
||||||
let chunks = data.1.chunks(data.0.data_word_size as usize / 8);
|
usize::from_be_bytes(result)
|
||||||
|
})
|
||||||
radial.products.insert(product.clone(), MomentaryProduct {
|
.map(|u| {
|
||||||
start_range_meters: data.0.data_moment_range as isize,
|
|
||||||
data_spacing_meters: data.0.data_moment_range_sample_interval as isize,
|
|
||||||
data: chunks.map(|u| {
|
|
||||||
let mut result = [0u8; 8];
|
|
||||||
result[..u.len()].copy_from_slice(u);
|
|
||||||
usize::from_be_bytes(result)
|
|
||||||
}).map(|u| {
|
|
||||||
if u == 0 {
|
if u == 0 {
|
||||||
Gate::BelowThreshold
|
Gate::BelowThreshold
|
||||||
} else if u == 1 {
|
} else if u == 1 {
|
||||||
Gate::RangeFolded
|
Gate::RangeFolded
|
||||||
} else if data.0.scale == 0.0 {
|
} else if data.0.scale == 0.0 {
|
||||||
Gate::Data(u as f64)
|
Gate::Data(u as f64)
|
||||||
} else {
|
} else {
|
||||||
Gate::Data((u as f64) - data.0.offset as f64 / data.0.scale as f64)
|
Gate::Data(
|
||||||
}
|
(u as f64) - data.0.offset as f64 / data.0.scale as f64,
|
||||||
|
)
|
||||||
}).collect(),
|
}
|
||||||
});
|
})
|
||||||
}
|
.collect(),
|
||||||
|
};
|
||||||
|
tx.send((elevation_num, m31.header.elevation_angle as f64, azimuth_num, m31.header.azimuth_angle as f64, product.clone(), mp)).unwrap();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
});
|
||||||
|
|
||||||
|
while let Ok((elevation, elevation_angle, azimuth, azimuth_angle, product, mp)) = rx.recv() {
|
||||||
|
let elevation = scan
|
||||||
|
.elevations
|
||||||
|
.entry(elevation)
|
||||||
|
.or_insert_with(|| Elevation {
|
||||||
|
elevation_number: elevation,
|
||||||
|
elevation_angle,
|
||||||
|
radials: Default::default(),
|
||||||
|
});
|
||||||
|
|
||||||
Ok(Self {
|
let radial = elevation
|
||||||
volume_header: vhr
|
.radials
|
||||||
})
|
.entry(azimuth)
|
||||||
|
.or_insert_with(|| Radial {
|
||||||
|
radial_number: azimuth,
|
||||||
|
azimuth_angle,
|
||||||
|
refl: None,
|
||||||
|
vel: None,
|
||||||
|
cfp: None,
|
||||||
|
sw: None,
|
||||||
|
zdr: None,
|
||||||
|
phi: None,
|
||||||
|
rho: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
match product.as_str() {
|
||||||
|
"REF" => { radial.refl = Some(mp); },
|
||||||
|
"VEL" => { radial.vel = Some(mp); },
|
||||||
|
"SW " => { radial.sw = Some(mp); },
|
||||||
|
"CFP" => { radial.cfp = Some(mp); },
|
||||||
|
"ZDR" => { radial.zdr = Some(mp); },
|
||||||
|
"PHI" => { radial.phi = Some(mp); },
|
||||||
|
"RHO" => { radial.rho = Some(mp); },
|
||||||
|
_ => panic!()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self { volume_header: vhr })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,4 +318,4 @@ fn decompress_record(compressed_data: &[u8]) -> Result<Cursor<Vec<u8>>, Ar2vErro
|
||||||
bz2_reader.read_to_end(&mut buf)?;
|
bz2_reader.read_to_end(&mut buf)?;
|
||||||
|
|
||||||
Ok(Cursor::new(buf))
|
Ok(Cursor::new(buf))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,27 +1,33 @@
|
||||||
use rustc_hash::FxHashMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
pub struct Scan {
|
pub struct Scan {
|
||||||
pub elevations: FxHashMap<usize, Elevation>
|
pub elevations: BTreeMap<usize, Elevation>,
|
||||||
}
|
}
|
||||||
pub struct Elevation {
|
pub struct Elevation {
|
||||||
pub elevation_number: usize,
|
pub elevation_number: usize,
|
||||||
pub elevation_angle: f64,
|
pub elevation_angle: f64,
|
||||||
|
|
||||||
pub radials: FxHashMap<usize, Radial>
|
pub radials: BTreeMap<usize, Radial>,
|
||||||
}
|
}
|
||||||
pub struct Radial {
|
pub struct Radial {
|
||||||
pub radial_number: usize,
|
pub radial_number: usize,
|
||||||
pub azimuth_angle: f64,
|
pub azimuth_angle: f64,
|
||||||
|
|
||||||
pub products: FxHashMap<String, MomentaryProduct>
|
pub refl: Option<MomentaryProduct>,
|
||||||
|
pub vel: Option<MomentaryProduct>,
|
||||||
|
pub cfp: Option<MomentaryProduct>,
|
||||||
|
pub sw: Option<MomentaryProduct>,
|
||||||
|
pub zdr: Option<MomentaryProduct>,
|
||||||
|
pub phi: Option<MomentaryProduct>,
|
||||||
|
pub rho: Option<MomentaryProduct>,
|
||||||
}
|
}
|
||||||
pub struct MomentaryProduct {
|
pub struct MomentaryProduct {
|
||||||
pub start_range_meters: isize,
|
pub start_range_meters: isize,
|
||||||
pub data_spacing_meters: isize,
|
pub data_spacing_meters: isize,
|
||||||
pub data: Vec<Gate>
|
pub data: Vec<Gate>,
|
||||||
}
|
}
|
||||||
pub enum Gate {
|
pub enum Gate {
|
||||||
BelowThreshold,
|
BelowThreshold,
|
||||||
RangeFolded,
|
RangeFolded,
|
||||||
Data(f64)
|
Data(f64),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use bincode::Decode;
|
|
||||||
use crate::parse::util::ExactLengthString;
|
use crate::parse::util::ExactLengthString;
|
||||||
|
use bincode::Decode;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -21,7 +21,7 @@ pub struct MessageHeader {
|
||||||
pub julian_date: u16,
|
pub julian_date: u16,
|
||||||
pub millis_of_day: u32,
|
pub millis_of_day: u32,
|
||||||
pub num_message_segments: u16,
|
pub num_message_segments: u16,
|
||||||
pub message_segment_num: u16
|
pub message_segment_num: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
|
@ -42,14 +42,14 @@ pub struct Message31Header {
|
||||||
pub elevation_angle: f32,
|
pub elevation_angle: f32,
|
||||||
pub radial_spot_blanking_status: u8,
|
pub radial_spot_blanking_status: u8,
|
||||||
pub azimuth_indexing_mode: u8,
|
pub azimuth_indexing_mode: u8,
|
||||||
pub data_block_count: u16
|
pub data_block_count: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct DataBlock {
|
pub struct DataBlock {
|
||||||
pub data_block_type: ExactLengthString<1>,
|
pub data_block_type: ExactLengthString<1>,
|
||||||
pub data_name: ExactLengthString<3>
|
pub data_name: ExactLengthString<3>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
|
@ -70,7 +70,7 @@ pub struct VolumeData {
|
||||||
pub system_differential_reflectivity: f32,
|
pub system_differential_reflectivity: f32,
|
||||||
pub initial_system_differential_phase: f32,
|
pub initial_system_differential_phase: f32,
|
||||||
pub vcp_number: u16,
|
pub vcp_number: u16,
|
||||||
pub processing_status: u16
|
pub processing_status: u16,
|
||||||
}
|
}
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -94,7 +94,7 @@ pub struct RadialData {
|
||||||
pub nyquist_velocity: u16,
|
pub nyquist_velocity: u16,
|
||||||
pub radial_flags: u16,
|
pub radial_flags: u16,
|
||||||
pub calib_const_horz_chan: f32,
|
pub calib_const_horz_chan: f32,
|
||||||
pub calib_const_vert_chan: f32
|
pub calib_const_vert_chan: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Decode, Debug)]
|
#[derive(Decode, Debug)]
|
||||||
|
@ -111,7 +111,7 @@ pub struct GenericDataMoment {
|
||||||
pub control_flags: u8,
|
pub control_flags: u8,
|
||||||
pub data_word_size: u8,
|
pub data_word_size: u8,
|
||||||
pub scale: f32,
|
pub scale: f32,
|
||||||
pub offset: f32
|
pub offset: f32,
|
||||||
}
|
}
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct Message31 {
|
pub struct Message31 {
|
||||||
|
@ -119,11 +119,11 @@ pub struct Message31 {
|
||||||
pub volume: VolumeData,
|
pub volume: VolumeData,
|
||||||
pub elevation: ElevationData,
|
pub elevation: ElevationData,
|
||||||
pub radial: RadialData,
|
pub radial: RadialData,
|
||||||
pub datablocks: HashMap<String, (GenericDataMoment, Vec<u8>)>
|
pub datablocks: HashMap<String, (GenericDataMoment, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
Message31(MessageHeader, Message31),
|
Message31(MessageHeader, Message31),
|
||||||
OtherMessage(MessageHeader, Vec<u8>)
|
OtherMessage(MessageHeader, Vec<u8>),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
use std::fmt::{Debug, Display, Formatter};
|
|
||||||
use std::io::Read;
|
|
||||||
use bincode::Decode;
|
use bincode::Decode;
|
||||||
use bincode::error::DecodeError;
|
use bincode::error::DecodeError;
|
||||||
|
use std::fmt::{Debug, Display, Formatter};
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
pub(crate) fn unpack_structure<D: Decode<()>, R: Read>(r: &mut R) -> Result<D, DecodeError> {
|
pub(crate) fn unpack_structure<D: Decode<()>, R: Read>(r: &mut R) -> Result<D, DecodeError> {
|
||||||
bincode::decode_from_std_read(r, bincode::config::standard()
|
bincode::decode_from_std_read(
|
||||||
.with_big_endian()
|
r,
|
||||||
.with_fixed_int_encoding()
|
bincode::config::standard()
|
||||||
|
.with_big_endian()
|
||||||
|
.with_fixed_int_encoding(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,4 +29,4 @@ impl<const N: usize> Debug for ExactLengthString<N> {
|
||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "{}", self)
|
write!(f, "{}", self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue