initial commit

This commit is contained in:
core 2023-11-03 16:29:19 -04:00
commit 368f8b23fc
Signed by: core
GPG Key ID: FDBF740DADDCEECF
20 changed files with 460 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
/target
**/*.rs.bk
Cargo.lock
nexrad-browser/bin/
nexrad-browser/pkg/
nexrad-browser/wasm-pack.log
nexrad-browser/www/wasm
nexrad-browser/www/

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

6
.idea/misc.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
</project>

7
.idea/vcs.xml Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/nexrad-browser/www" vcs="Git" />
</component>
</project>

12
Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[workspace]
members = [
"nexrad2",
"rtwx",
"nexrad-browser",
"nxar2"
]
resolver = "2"
[profile.release.package.nexrad-browser]
# Tell `rustc` to optimize for small code size.
opt-level = "s"

BIN
KGRB_WITH_STUFF Normal file

Binary file not shown.

BIN
KOTX20231102_143210_V06 Normal file

Binary file not shown.

Binary file not shown.

32
nexrad-browser/Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "nexrad-browser"
version = "0.1.0"
authors = ["core <core@coredoes.dev>"]
edition = "2021"
description = "A web-based browser for NEXRAD Archive II datafiles"
repository = "https://git.e3t.cc/core/rtwx"
license = "GPL-3"
[lib]
crate-type = ["cdylib", "rlib"]
[features]
default = ["console_error_panic_hook"]
[dependencies]
wasm-bindgen = "0.2.84"
# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires
# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for
# code size when deploying.
console_error_panic_hook = { version = "0.1.7", optional = true }
nexrad2 = { version = "0.1.0", path = "../nexrad2" }
log = "0.4"
web-sys = "0.3"
js-sys = "0.3"
wasm-logger = "0.2"
[dev-dependencies]
wasm-bindgen-test = "0.3.34"

34
nexrad-browser/src/lib.rs Normal file
View File

@ -0,0 +1,34 @@
pub mod utils;
use std::io::Cursor;
use js_sys::Uint8Array;
use log::info;
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
extern "C" {
fn alert(s: &str);
}
#[wasm_bindgen]
pub fn __nxrd_browser_init() {
wasm_logger::init(wasm_logger::Config::new(log::Level::Trace));
utils::set_panic_hook();
info!("nexrad-browser initialized successfully");
}
#[wasm_bindgen]
pub fn load_ar2(buf: &JsValue) {
info!("loading archive 2 file");
info!("load 01: convert to uint8array");
let array = Uint8Array::new(buf);
info!("load 02: convert to rust vec");
let rvec = array.to_vec();
info!("load 03: create cursor");
let mut cursor = Cursor::new(rvec);
info!("load 04: load");
let loaded = nexrad2::parse_nx2_chunk(&mut cursor).unwrap();
info!("load 05: dump");
info!("Loaded: {:#?}", loaded);
}

View File

@ -0,0 +1,10 @@
pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
// `set_panic_hook` function at least once during initialization, and then
// we will get better error messages if our code ever panics.
//
// For more details see
// https://github.com/rustwasm/console_error_panic_hook#readme
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();
}

11
nexrad2/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "nexrad2"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
#bzip2-rs = "0.1"
bzip2 = "0.4"
log = "0.4"

183
nexrad2/src/lib.rs Normal file
View File

@ -0,0 +1,183 @@
//! # nexrad2
//! A parser for the NEXRAD II raw datafile format.
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::io::{Cursor, Read, Seek, SeekFrom};
use std::str::Utf8Error;
use bzip2::read::BzDecoder;
//use bzip2_rs::DecoderReader;
use log::{debug, trace};
pub mod message;
#[derive(Debug)]
pub struct Nexrad2Chunk {
pub volume_header_record: VolumeHeaderRecord,
}
#[derive(Debug, Clone, PartialEq, Eq)]
/// The Volume Header Record as defined by 2620010H (Build 19.0) section 7.3.3 Volume Header Record
pub struct VolumeHeaderRecord {
/// A character constant of which the last 2 characters identify the version.
/// Versions defined by 262010H Build 19.0:
/// - `AR2V0002.`: Super Resolution disabled at the RDA (pre RDA Build 12.0)
/// - `AR2V0003.`: Super Resolution (pre RDA Build 12.0)
/// - `AR2V0004.`: Recombined Super Resolution
/// - `AR2V0005.`: Super Resolution disabled at the RDA (RDA Build 12.0 and later)
/// - `AR2V0006.`: Super Resolution (RDA Build 12.0 and later)
/// - `AR2V0007.`: Recombined Super Resolution (RDA Build 12.0 and later)
pub tape_filename: String,
/// Increases by 1 for each volume of radar data in the queue to a maximum of 999, whereupon it rolls back to 001
pub extension_number: String,
/// Date - NEXRAD-modified Julian - days since 1/1/1970 where 1/1/1970 is 1.
pub date: u32,
/// Time - milliseconds past midnight
pub time: u32,
/// ICAO of the radar
pub icao: String
}
#[derive(Debug)]
pub enum NexradParseError {
InvalidHeaderTapeFilename(Utf8Error),
HeaderReadError(io::Error),
InvalidHeaderLength(usize),
InvalidHeaderExtensionNumber(Utf8Error),
InvalidICAO(Utf8Error),
FileTooShort,
DecompressError(io::Error),
InvalidMetaChunkLength(usize),
TcmHeaderReadFailed(io::Error),
TcmChunkReadFailed(io::Error),
FailedToReadFile(io::Error),
LdmReadFailed(io::Error)
}
impl Display for NexradParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidHeaderTapeFilename(e) => write!(f, "invalid header tape filename: {}", e),
Self::HeaderReadError(e) => write!(f, "error reading header from cursor: {}", e),
Self::InvalidHeaderLength(read) => write!(f, "invalid header length, expected 24 bytes but only read {}", read),
Self::InvalidHeaderExtensionNumber(e) => write!(f, "invalid header extension number: {}", e),
Self::InvalidICAO(e) => write!(f, "invalid station icao: {}", e),
Self::FileTooShort => write!(f, "file too short (failed to fill whole buffer)"),
Self::DecompressError(e) => write!(f, "decompression error: {}", e),
Self::InvalidMetaChunkLength(real) => write!(f, "invalid meta chunk length, spec defined as {NEXRAD2_META_CHUNK_FIXED_LENGTH} but got {real}"),
Self::TcmHeaderReadFailed(e) => write!(f, "tcm message header read failed: {}", e),
Self::TcmChunkReadFailed(e) => write!(f, "tcm chunk read failed: {}", e),
Self::FailedToReadFile(e) => write!(f, "failed to read file: {}", e),
Self::LdmReadFailed(e) => write!(f, "ldm read failed: {}", e)
}
}
}
impl Error for NexradParseError {}
pub const NEXRAD2_META_CHUNK_FIXED_LENGTH: usize = 325888;
pub const MESSAGE_HEADER_SIZE: usize = 2 + 1 + 1 + 2 + 2 + 4 + 2 + 2;
#[derive(Debug)]
pub struct MessageHeader {
pub message_size: u16,
pub rda_redundant_channel: u8,
pub message_type: u8,
pub message_sequence_number: u16,
pub julian_date: u16,
pub millis_after_midnight: u32,
pub num_of_message_segments: u16,
pub message_segment_num: u16
}
pub enum Message {
}
pub const LEGACY_TCM_HEADER_LENGTH: i64 = 12;
pub fn parse_nx2_chunk(cursor: &mut (impl Read + Seek)) -> Result<Nexrad2Chunk, NexradParseError> {
let mut volume_header = [0u8; 24];
let read = cursor.read(&mut volume_header).map_err(|e| NexradParseError::HeaderReadError(e))?;
if read != 24 { return Err(NexradParseError::InvalidHeaderLength(read)); }
let tape_filename_bytes = &volume_header[0..9];
let tape_filename = std::str::from_utf8(tape_filename_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?;
let extension_number_bytes = &volume_header[9..12];
let extension_number = std::str::from_utf8(extension_number_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?;
let days_since_epoch = u32::from_be_bytes(volume_header[12..16].try_into().unwrap());
let millis_after_midnight = u32::from_be_bytes(volume_header[16..20].try_into().unwrap());
let icao_bytes = &volume_header[20..24];
let icao = std::str::from_utf8(icao_bytes).map_err(|e| NexradParseError::InvalidHeaderTapeFilename(e))?;
let header = VolumeHeaderRecord {
tape_filename: tape_filename.to_string(),
extension_number: extension_number.to_string(),
date: days_since_epoch,
time: millis_after_midnight,
icao: icao.to_string(),
};
trace!("Loaded - {:#?}", header);
loop {
// LDM records
let mut ldm_size_bytes = [0u8; 4];
cursor.read_exact(&mut ldm_size_bytes).map_err(|e| NexradParseError::LdmReadFailed(e))?;
let ldm_size = i32::from_be_bytes(ldm_size_bytes).abs() as usize;
trace!("Reading LDM record - {} bytes compressed", ldm_size);
if ldm_size == 0 {
trace!("Missing LDM record, unseeking size");
cursor.seek(SeekFrom::Current(-4)).map_err(|e| NexradParseError::LdmReadFailed(e))?;
}
let mut compressed_buf = vec![0u8; ldm_size];
cursor.read_exact(&mut compressed_buf).map_err(|e| NexradParseError::LdmReadFailed(e))?;
let mut bz_decoder = BzDecoder::new(Cursor::new(compressed_buf));
let mut decompressed_buf = vec![];
io::copy(&mut bz_decoder, &mut decompressed_buf).map_err(|e| NexradParseError::DecompressError(e))?;
trace!("LDM record decompressed to {} bytes", decompressed_buf.len());
let mut decompressed = Cursor::new(decompressed_buf);
loop {
decompressed.seek(SeekFrom::Current(LEGACY_TCM_HEADER_LENGTH)).map_err(|e| NexradParseError::TcmChunkReadFailed(e))?;
let mut message_header = [0u8; MESSAGE_HEADER_SIZE];
decompressed.read_exact(&mut message_header).map_err(|e| NexradParseError::TcmChunkReadFailed(e))?;
let message_header = MessageHeader {
message_size: u16::from_be_bytes(message_header[0..2].try_into().unwrap()),
rda_redundant_channel: message_header[2],
message_type: message_header[3],
message_sequence_number: u16::from_be_bytes(message_header[4..6].try_into().unwrap()),
julian_date: u16::from_be_bytes(message_header[6..8].try_into().unwrap()),
millis_after_midnight: u32::from_be_bytes(message_header[8..12].try_into().unwrap()),
num_of_message_segments: u16::from_be_bytes(message_header[12..14].try_into().unwrap()),
message_segment_num: u16::from_be_bytes(message_header[14..].try_into().unwrap()),
};
debug!("Message: {:#?}", message_header);
break;
}
break;
}
Ok(Nexrad2Chunk {
volume_header_record: header
})
}

3
nexrad2/src/message.rs Normal file
View File

@ -0,0 +1,3 @@
pub enum Message {
}

10
nxar2/Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "nxar2"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
simple_logger = "4"
nexrad2 = { version = "0.1", path = "../nexrad2" }

12
nxar2/src/main.rs Normal file
View File

@ -0,0 +1,12 @@
use std::fs;
use std::fs::File;
use nexrad2::parse_nx2_chunk;
fn main() {
simple_logger::init().unwrap();
let test = std::env::args().nth(1).unwrap();
let mut data = File::open(test).unwrap();
parse_nx2_chunk(&mut data).unwrap();
}

17
rtwx/Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "nexrad"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
simple_logger = "4.2"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
aws-sdk-sqs = "0.34"
aws-config = "0.56"
prometheus = "0.13"
lazy_static = "1.4"
serde_json = "1"

1
rtwx/src/archive.rs Normal file
View File

@ -0,0 +1 @@
pub const ARCHIVE_SQS_URL: &str = "https://sqs.us-east-1.amazonaws.com/232232181562/NEXRAD_m1_q2";

29
rtwx/src/firehose.rs Normal file
View File

@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
pub const FIREHOSE_SQS_URL: &str = "https://sqs.us-east-1.amazonaws.com/232232181562/NEXRAD_m1_q1";
#[derive(Serialize, Deserialize)]
pub struct S3NEXRADData {
#[serde(rename = "Key")]
pub key: String,
#[serde(rename = "SiteID")]
pub site_id: String,
#[serde(rename = "DateTime")]
pub date_time: String,
#[serde(rename = "VolumeID")]
pub volume_id: i32,
#[serde(rename = "ChunkID")]
pub chunk_id: i32,
#[serde(rename = "ChunkType")]
pub chunk_type: String,
#[serde(rename = "L2Version")]
pub l2version: String,
#[serde(rename = "S3Bucket")]
pub s3bucket: String
}
#[derive(Serialize, Deserialize)]
pub struct S3Notification {
#[serde(rename = "Message")]
pub message: String,
}

77
rtwx/src/main.rs Normal file
View File

@ -0,0 +1,77 @@
use std::error::Error;
use std::time::SystemTime;
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use prometheus::{IntGauge, IntCounter, register_int_counter, register_int_gauge};
use serde::{Deserialize, Serialize};
lazy_static! {
static ref NEXRAD_LAST_CHUNK_TIME: IntGauge = register_int_gauge!("rtwx_nexrad_last_chunk_time", "Unix timestamp of when the last chunk was received from NEXRAD").unwrap();
static ref NEXRAD_CHUNKS_RECEIVED: IntCounter = register_int_counter!("rtwx_nexrad_chunks_total", "Total number of chunks received from NEXRAD").unwrap();
}
pub mod firehose;
pub mod archive;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NexradMode {
Firehose,
Archive
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
simple_logger::init_with_env()?;
info!("core's realtime weather API {} starting up", env!("CARGO_PKG_VERSION"));
info!("initializing primary radar (NOAA NEXRAD) via AWS API");
let nexrad_mode = std::env::var("RTWX_NEXRAD_MODE").expect("env var RTWX_NEXRAD_MODE is required and must be set to 'archive' (recommended) or 'firehose' (expensive)");
let nexrad_mode = match nexrad_mode.as_str() {
"firehose" => NexradMode::Firehose,
"archive" => NexradMode::Archive,
_ => {
error!("env var RTWX_NEXRAD_MODE must be set to 'archive' or 'firehose'");
std::process::exit(1);
}
};
if nexrad_mode == NexradMode::Firehose {
warn!("warning: firehose intake mode is *very* expensive ($2-5/day!) to operate");
warn!("warning: please be careful!");
}
let config = aws_config::load_from_env().await;
let sqs_client = aws_sdk_sqs::Client::new(&config);
info!("NEXRAD in alignment. May take up to 10 minutes for data to be available worldwide");
let mut total_frames = 0;
let start_time = SystemTime::now();
let queue_url = match nexrad_mode {
NexradMode::Firehose => firehose::FIREHOSE_SQS_URL,
NexradMode::Archive => archive::ARCHIVE_SQS_URL
};
loop {
let rcv_message_output = sqs_client.receive_message().max_number_of_messages(10).queue_url(queue_url).send().await?;
for message in rcv_message_output.messages.unwrap_or_default() {
// decode frame
if nexrad_mode == NexradMode::Firehose {
let notification: firehose::S3Notification = serde_json::from_str(&message.body.unwrap())?;
let frame: firehose::S3NEXRADData = serde_json::from_str(&notification.message)?;
total_frames += 1;
let total_seconds: f64 = (SystemTime::now().duration_since(start_time).unwrap().as_millis() as f64) / 1000f64;
info!("New NEXRAD frame: {} {} vol {} chunk {} type {} version {} @ {}/{} rate {} frames/second", frame.site_id, frame.date_time, frame.volume_id, frame.chunk_id, frame.chunk_type, frame.l2version, frame.s3bucket, frame.key, (total_frames as f64) / total_seconds);
} else {
info!("Archive mode not available yet");
}
}
}
info!("lightning data not yet available (Blitzortung)");
Ok(())
}