commit 368f8b23fc1a8d1f5ec193415b7ee62f4af1cd4c Author: core Date: Fri Nov 3 16:29:19 2023 -0400 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf302da --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/target +**/*.rs.bk +Cargo.lock +nexrad-browser/bin/ +nexrad-browser/pkg/ +nexrad-browser/wasm-pack.log +nexrad-browser/www/wasm +nexrad-browser/www/ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..3ce3588 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..f87d00e --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7ee8d5d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[workspace] +members = [ + "nexrad2", + "rtwx", + "nexrad-browser", + "nxar2" +] +resolver = "2" + +[profile.release.package.nexrad-browser] +# Tell `rustc` to optimize for small code size. +opt-level = "s" diff --git a/KGRB_WITH_STUFF b/KGRB_WITH_STUFF new file mode 100644 index 0000000..bb0459c Binary files /dev/null and b/KGRB_WITH_STUFF differ diff --git a/KOTX20231102_143210_V06 b/KOTX20231102_143210_V06 new file mode 100644 index 0000000..9d65366 Binary files /dev/null and b/KOTX20231102_143210_V06 differ diff --git a/krax_06_20231102_125257_034_i.nxrd b/krax_06_20231102_125257_034_i.nxrd new file mode 100644 index 0000000..b48b359 Binary files /dev/null and b/krax_06_20231102_125257_034_i.nxrd differ diff --git a/nexrad-browser/Cargo.toml b/nexrad-browser/Cargo.toml new file mode 100644 index 0000000..bb2ba66 --- /dev/null +++ b/nexrad-browser/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "nexrad-browser" +version = "0.1.0" +authors = ["core "] +edition = "2021" +description = "A web-based browser for NEXRAD Archive II datafiles" +repository = "https://git.e3t.cc/core/rtwx" +license = "GPL-3" + +[lib] +crate-type = ["cdylib", "rlib"] + +[features] +default = ["console_error_panic_hook"] + +[dependencies] +wasm-bindgen = "0.2.84" + +# The `console_error_panic_hook` crate provides better debugging of panics by +# logging them with `console.error`. This is great for development, but requires +# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for +# code size when deploying. +console_error_panic_hook = { version = "0.1.7", optional = true } +nexrad2 = { version = "0.1.0", path = "../nexrad2" } +log = "0.4" +web-sys = "0.3" +js-sys = "0.3" +wasm-logger = "0.2" + +[dev-dependencies] +wasm-bindgen-test = "0.3.34" + diff --git a/nexrad-browser/src/lib.rs b/nexrad-browser/src/lib.rs new file mode 100644 index 0000000..e774de8 --- /dev/null +++ b/nexrad-browser/src/lib.rs @@ -0,0 +1,34 @@ +pub mod utils; + +use std::io::Cursor; +use js_sys::Uint8Array; +use log::info; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen] +extern "C" { + fn alert(s: &str); +} + +#[wasm_bindgen] +pub fn __nxrd_browser_init() { + wasm_logger::init(wasm_logger::Config::new(log::Level::Trace)); + utils::set_panic_hook(); + info!("nexrad-browser initialized successfully"); +} + +#[wasm_bindgen] +pub fn load_ar2(buf: &JsValue) { + + info!("loading archive 2 file"); + info!("load 01: convert to uint8array"); + let array = Uint8Array::new(buf); + info!("load 02: convert to rust vec"); + let rvec = array.to_vec(); + info!("load 03: create cursor"); + let mut cursor = Cursor::new(rvec); + info!("load 04: load"); + let loaded = nexrad2::parse_nx2_chunk(&mut cursor).unwrap(); + info!("load 05: dump"); + info!("Loaded: {:#?}", loaded); +} \ No newline at end of file diff --git a/nexrad-browser/src/utils.rs b/nexrad-browser/src/utils.rs new file mode 100644 index 0000000..b1d7929 --- /dev/null +++ b/nexrad-browser/src/utils.rs @@ -0,0 +1,10 @@ +pub fn set_panic_hook() { + // When the `console_error_panic_hook` feature is enabled, we can call the + // `set_panic_hook` function at least once during initialization, and then + // we will get better error messages if our code ever panics. + // + // For more details see + // https://github.com/rustwasm/console_error_panic_hook#readme + #[cfg(feature = "console_error_panic_hook")] + console_error_panic_hook::set_once(); +} diff --git a/nexrad2/Cargo.toml b/nexrad2/Cargo.toml new file mode 100644 index 0000000..43c7db2 --- /dev/null +++ b/nexrad2/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "nexrad2" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +#bzip2-rs = "0.1" +bzip2 = "0.4" +log = "0.4" diff --git a/nexrad2/src/lib.rs b/nexrad2/src/lib.rs new file mode 100644 index 0000000..34f27d4 --- /dev/null +++ b/nexrad2/src/lib.rs @@ -0,0 +1,183 @@ +//! # nexrad2 +//! A parser for the NEXRAD II raw datafile format. + +use std::error::Error; +use std::fmt::{Display, Formatter}; +use std::io; +use std::io::{Cursor, Read, Seek, SeekFrom}; +use std::str::Utf8Error; +use bzip2::read::BzDecoder; +//use bzip2_rs::DecoderReader; +use log::{debug, trace}; + +pub mod message; + +#[derive(Debug)] +pub struct Nexrad2Chunk { + pub volume_header_record: VolumeHeaderRecord, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// The Volume Header Record as defined by 2620010H (Build 19.0) section 7.3.3 Volume Header Record +pub struct VolumeHeaderRecord { + /// A character constant of which the last 2 characters identify the version. + /// Versions defined by 262010H Build 19.0: + /// - `AR2V0002.`: Super Resolution disabled at the RDA (pre RDA Build 12.0) + /// - `AR2V0003.`: Super Resolution (pre RDA Build 12.0) + /// - `AR2V0004.`: Recombined Super Resolution + /// - `AR2V0005.`: Super Resolution disabled at the RDA (RDA Build 12.0 and later) + /// - `AR2V0006.`: Super Resolution (RDA Build 12.0 and later) + /// - `AR2V0007.`: Recombined Super Resolution (RDA Build 12.0 and later) + pub tape_filename: String, + /// Increases by 1 for each volume of radar data in the queue to a maximum of 999, whereupon it rolls back to 001 + pub extension_number: String, + /// Date - NEXRAD-modified Julian - days since 1/1/1970 where 1/1/1970 is 1. + pub date: u32, + /// Time - milliseconds past midnight + pub time: u32, + /// ICAO of the radar + pub icao: String +} + + +#[derive(Debug)] +pub enum NexradParseError { + InvalidHeaderTapeFilename(Utf8Error), + HeaderReadError(io::Error), + InvalidHeaderLength(usize), + InvalidHeaderExtensionNumber(Utf8Error), + InvalidICAO(Utf8Error), + FileTooShort, + DecompressError(io::Error), + InvalidMetaChunkLength(usize), + TcmHeaderReadFailed(io::Error), + TcmChunkReadFailed(io::Error), + FailedToReadFile(io::Error), + LdmReadFailed(io::Error) +} +impl Display for NexradParseError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidHeaderTapeFilename(e) => write!(f, "invalid header tape filename: {}", e), + Self::HeaderReadError(e) => write!(f, "error reading header from cursor: {}", e), + Self::InvalidHeaderLength(read) => write!(f, "invalid header length, expected 24 bytes but only read {}", read), + Self::InvalidHeaderExtensionNumber(e) => write!(f, "invalid header extension number: {}", e), + Self::InvalidICAO(e) => write!(f, "invalid station icao: {}", e), + Self::FileTooShort => write!(f, "file too short (failed to fill whole buffer)"), + Self::DecompressError(e) => write!(f, "decompression error: {}", e), + Self::InvalidMetaChunkLength(real) => write!(f, "invalid meta chunk length, spec defined as {NEXRAD2_META_CHUNK_FIXED_LENGTH} but got {real}"), + Self::TcmHeaderReadFailed(e) => write!(f, "tcm message header read failed: {}", e), + Self::TcmChunkReadFailed(e) => write!(f, "tcm chunk read failed: {}", e), + Self::FailedToReadFile(e) => write!(f, "failed to read file: {}", e), + Self::LdmReadFailed(e) => write!(f, "ldm read failed: {}", e) + } + } +} +impl Error for NexradParseError {} + +pub const NEXRAD2_META_CHUNK_FIXED_LENGTH: usize = 325888; + +pub const MESSAGE_HEADER_SIZE: usize = 2 + 1 + 1 + 2 + 2 + 4 + 2 + 2; + +#[derive(Debug)] +pub struct MessageHeader { + pub message_size: u16, + pub rda_redundant_channel: u8, + pub message_type: u8, + pub message_sequence_number: u16, + pub julian_date: u16, + pub millis_after_midnight: u32, + pub num_of_message_segments: u16, + pub message_segment_num: u16 + +} + +pub enum Message { + +} + +pub const LEGACY_TCM_HEADER_LENGTH: i64 = 12; + +pub fn parse_nx2_chunk(cursor: &mut (impl Read + Seek)) -> Result { + let mut volume_header = [0u8; 24]; + let read = cursor.read(&mut volume_header).map_err(|e| NexradParseError::HeaderReadError(e))?; + if read != 24 { return Err(NexradParseError::InvalidHeaderLength(read)); } + + let tape_filename_bytes = &volume_header[0..9]; + let tape_filename = std::str::from_utf8(tape_filename_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?; + + let extension_number_bytes = &volume_header[9..12]; + let extension_number = std::str::from_utf8(extension_number_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?; + + let days_since_epoch = u32::from_be_bytes(volume_header[12..16].try_into().unwrap()); + let millis_after_midnight = u32::from_be_bytes(volume_header[16..20].try_into().unwrap()); + + let icao_bytes = &volume_header[20..24]; + let icao = std::str::from_utf8(icao_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?; + + let header = VolumeHeaderRecord { + tape_filename: tape_filename.to_string(), + extension_number: extension_number.to_string(), + date: days_since_epoch, + time: millis_after_midnight, + icao: icao.to_string(), + }; + + trace!("Loaded - {:#?}", header); + + loop { + // LDM records + let mut ldm_size_bytes = [0u8; 4]; + cursor.read_exact(&mut ldm_size_bytes).map_err(|e| NexradParseError::LdmReadFailed(e))?; + let ldm_size = i32::from_be_bytes(ldm_size_bytes).abs() as usize; + + trace!("Reading LDM record - {} bytes compressed", ldm_size); + + if ldm_size == 0 { + trace!("Missing LDM record, unseeking size"); + cursor.seek(SeekFrom::Current(-4)).map_err(|e| NexradParseError::LdmReadFailed(e))?; + } + + let mut compressed_buf = vec![0u8; ldm_size]; + cursor.read_exact(&mut compressed_buf).map_err(|e| NexradParseError::LdmReadFailed(e))?; + + let mut bz_decoder = BzDecoder::new(Cursor::new(compressed_buf)); + + let mut decompressed_buf = vec![]; + + io::copy(&mut bz_decoder, &mut decompressed_buf).map_err(|e| NexradParseError::DecompressError(e))?; + + trace!("LDM record decompressed to {} bytes", decompressed_buf.len()); + + let mut decompressed = Cursor::new(decompressed_buf); + + loop { + decompressed.seek(SeekFrom::Current(LEGACY_TCM_HEADER_LENGTH)).map_err(|e| NexradParseError::TcmChunkReadFailed(e))?; + + let mut message_header = [0u8; MESSAGE_HEADER_SIZE]; + decompressed.read_exact(&mut message_header).map_err(|e| NexradParseError::TcmChunkReadFailed(e))?; + + let message_header = MessageHeader { + message_size: u16::from_be_bytes(message_header[0..2].try_into().unwrap()), + rda_redundant_channel: message_header[2], + message_type: message_header[3], + message_sequence_number: u16::from_be_bytes(message_header[4..6].try_into().unwrap()), + julian_date: u16::from_be_bytes(message_header[6..8].try_into().unwrap()), + millis_after_midnight: u32::from_be_bytes(message_header[8..12].try_into().unwrap()), + num_of_message_segments: u16::from_be_bytes(message_header[12..14].try_into().unwrap()), + message_segment_num: u16::from_be_bytes(message_header[14..].try_into().unwrap()), + }; + + debug!("Message: {:#?}", message_header); + + break; + } + + break; + } + + Ok(Nexrad2Chunk { + volume_header_record: header + }) +} + diff --git a/nexrad2/src/message.rs b/nexrad2/src/message.rs new file mode 100644 index 0000000..1032981 --- /dev/null +++ b/nexrad2/src/message.rs @@ -0,0 +1,3 @@ +pub enum Message { + +} \ No newline at end of file diff --git a/nxar2/Cargo.toml b/nxar2/Cargo.toml new file mode 100644 index 0000000..e1a9db4 --- /dev/null +++ b/nxar2/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "nxar2" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +simple_logger = "4" +nexrad2 = { version = "0.1", path = "../nexrad2" } \ No newline at end of file diff --git a/nxar2/src/main.rs b/nxar2/src/main.rs new file mode 100644 index 0000000..d35d48d --- /dev/null +++ b/nxar2/src/main.rs @@ -0,0 +1,12 @@ +use std::fs; +use std::fs::File; +use nexrad2::parse_nx2_chunk; + +fn main() { + simple_logger::init().unwrap(); + let test = std::env::args().nth(1).unwrap(); + + let mut data = File::open(test).unwrap(); + + parse_nx2_chunk(&mut data).unwrap(); +} \ No newline at end of file diff --git a/rtwx/Cargo.toml b/rtwx/Cargo.toml new file mode 100644 index 0000000..456c3bb --- /dev/null +++ b/rtwx/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "nexrad" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +log = "0.4" +simple_logger = "4.2" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1", features = ["full"] } +aws-sdk-sqs = "0.34" +aws-config = "0.56" +prometheus = "0.13" +lazy_static = "1.4" +serde_json = "1" \ No newline at end of file diff --git a/rtwx/src/archive.rs b/rtwx/src/archive.rs new file mode 100644 index 0000000..90a8979 --- /dev/null +++ b/rtwx/src/archive.rs @@ -0,0 +1 @@ +pub const ARCHIVE_SQS_URL: &str = "https://sqs.us-east-1.amazonaws.com/232232181562/NEXRAD_m1_q2"; \ No newline at end of file diff --git a/rtwx/src/firehose.rs b/rtwx/src/firehose.rs new file mode 100644 index 0000000..05dae66 --- /dev/null +++ b/rtwx/src/firehose.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +pub const FIREHOSE_SQS_URL: &str = "https://sqs.us-east-1.amazonaws.com/232232181562/NEXRAD_m1_q1"; + +#[derive(Serialize, Deserialize)] +pub struct S3NEXRADData { + #[serde(rename = "Key")] + pub key: String, + #[serde(rename = "SiteID")] + pub site_id: String, + #[serde(rename = "DateTime")] + pub date_time: String, + #[serde(rename = "VolumeID")] + pub volume_id: i32, + #[serde(rename = "ChunkID")] + pub chunk_id: i32, + #[serde(rename = "ChunkType")] + pub chunk_type: String, + #[serde(rename = "L2Version")] + pub l2version: String, + #[serde(rename = "S3Bucket")] + pub s3bucket: String +} + +#[derive(Serialize, Deserialize)] +pub struct S3Notification { + #[serde(rename = "Message")] + pub message: String, +} \ No newline at end of file diff --git a/rtwx/src/main.rs b/rtwx/src/main.rs new file mode 100644 index 0000000..dd41996 --- /dev/null +++ b/rtwx/src/main.rs @@ -0,0 +1,77 @@ +use std::error::Error; +use std::time::SystemTime; +use lazy_static::lazy_static; +use log::{debug, error, info, warn}; +use prometheus::{IntGauge, IntCounter, register_int_counter, register_int_gauge}; +use serde::{Deserialize, Serialize}; + +lazy_static! { + static ref NEXRAD_LAST_CHUNK_TIME: IntGauge = register_int_gauge!("rtwx_nexrad_last_chunk_time", "Unix timestamp of when the last chunk was received from NEXRAD").unwrap(); + static ref NEXRAD_CHUNKS_RECEIVED: IntCounter = register_int_counter!("rtwx_nexrad_chunks_total", "Total number of chunks received from NEXRAD").unwrap(); +} + +pub mod firehose; +pub mod archive; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NexradMode { + Firehose, + Archive +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + simple_logger::init_with_env()?; + + info!("core's realtime weather API {} starting up", env!("CARGO_PKG_VERSION")); + + info!("initializing primary radar (NOAA NEXRAD) via AWS API"); + + let nexrad_mode = std::env::var("RTWX_NEXRAD_MODE").expect("env var RTWX_NEXRAD_MODE is required and must be set to 'archive' (recommended) or 'firehose' (expensive)"); + let nexrad_mode = match nexrad_mode.as_str() { + "firehose" => NexradMode::Firehose, + "archive" => NexradMode::Archive, + _ => { + error!("env var RTWX_NEXRAD_MODE must be set to 'archive' or 'firehose'"); + std::process::exit(1); + } + }; + if nexrad_mode == NexradMode::Firehose { + warn!("warning: firehose intake mode is *very* expensive ($2-5/day!) to operate"); + warn!("warning: please be careful!"); + } + + let config = aws_config::load_from_env().await; + let sqs_client = aws_sdk_sqs::Client::new(&config); + + info!("NEXRAD in alignment. May take up to 10 minutes for data to be available worldwide"); + + let mut total_frames = 0; + let start_time = SystemTime::now(); + + let queue_url = match nexrad_mode { + NexradMode::Firehose => firehose::FIREHOSE_SQS_URL, + NexradMode::Archive => archive::ARCHIVE_SQS_URL + }; + + loop { + let rcv_message_output = sqs_client.receive_message().max_number_of_messages(10).queue_url(queue_url).send().await?; + for message in rcv_message_output.messages.unwrap_or_default() { + // decode frame + if nexrad_mode == NexradMode::Firehose { + let notification: firehose::S3Notification = serde_json::from_str(&message.body.unwrap())?; + let frame: firehose::S3NEXRADData = serde_json::from_str(¬ification.message)?; + total_frames += 1; + let total_seconds: f64 = (SystemTime::now().duration_since(start_time).unwrap().as_millis() as f64) / 1000f64; + info!("New NEXRAD frame: {} {} vol {} chunk {} type {} version {} @ {}/{} rate {} frames/second", frame.site_id, frame.date_time, frame.volume_id, frame.chunk_id, frame.chunk_type, frame.l2version, frame.s3bucket, frame.key, (total_frames as f64) / total_seconds); + } else { + info!("Archive mode not available yet"); + } + } + } + + + info!("lightning data not yet available (Blitzortung)"); + + Ok(()) +}