From b092907a8a21b4cc0b4217fdb70408b7c9497030 Mon Sep 17 00:00:00 2001 From: TerraMaster85 Date: Fri, 20 Dec 2024 22:44:24 -0500 Subject: [PATCH] carrier now uses live sample rate --- src/audio_receiver/mod.rs | 9 ++++--- src/audio_transmitter/mod.rs | 39 +++++++++++++++++---------- src/dsp_common/carrier.rs | 40 ++++++++++++++-------------- src/dsp_common/sample.rs | 23 ++++++++-------- src/dsp_outb/mod.rs | 51 ++++++++++++++++++++---------------- src/main.rs | 13 ++++++--- src/task_msg.rs | 7 +++-- 7 files changed, 104 insertions(+), 78 deletions(-) diff --git a/src/audio_receiver/mod.rs b/src/audio_receiver/mod.rs index 256fea0..45963d5 100644 --- a/src/audio_receiver/mod.rs +++ b/src/audio_receiver/mod.rs @@ -105,8 +105,9 @@ pub fn audio_receiver_main( Ok(input_stream) } -fn take_input_pcm(pcm: &[T], channel: &Sender) where T: Sample + std::fmt::Debug, i64: FromSample { - if channel.try_send(pcm.iter().map(|&n| n.to_sample::()).collect()).is_err() { - error!("demodulator PCM data queue is full???"); - } +fn take_input_pcm(pcm: &[T], channel: &Sender) where T: Sample + std::fmt::Debug, f64: FromSample { + let _ = pcm; + //if channel.try_send(pcm.iter().map(|&n| n.to_sample::()).collect()).is_err() { + // error!("demodulator PCM data queue is full???"); + //} } diff --git a/src/audio_transmitter/mod.rs b/src/audio_transmitter/mod.rs index 230f4c4..ef49736 100644 --- a/src/audio_transmitter/mod.rs +++ b/src/audio_transmitter/mod.rs @@ -7,10 +7,12 @@ use tracing::{error, info, trace, warn}; use tokio::sync::mpsc::Receiver; -use crate::task_msg::{PcmFrameMessage, PcmSampleMessage}; + +use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, SampleRateNotifyMessage}; pub fn audio_transmitter_main( mut rx_for_wire: Receiver, + mut tx_sample_rate_notify: tokio::sync::oneshot::Sender, cpal_output: &cpal::Device, ) -> Result { trace!("audio_transmitter started"); @@ -33,6 +35,7 @@ pub fn audio_transmitter_main( } }).expect("output device has no supported configurations available") .with_max_sample_rate(); + //.with_sample_rate(cpal::SampleRate(44100)); info!( "output samples are {} @ {}hz on {} channels", @@ -48,7 +51,7 @@ pub fn audio_transmitter_main( let output_stream = match cpal_output_config.sample_format() { cpal::SampleFormat::U8 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -56,7 +59,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::I8 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -64,7 +67,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::U16 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -72,7 +75,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::I16 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -80,7 +83,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::U32 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -88,7 +91,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::I32 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -96,7 +99,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::F32 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -104,7 +107,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::F64 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -112,7 +115,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::U64 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -120,7 +123,7 @@ pub fn audio_transmitter_main( .context("failed to build output stream")?, cpal::SampleFormat::I64 => cpal_output .build_output_stream( - &cpal_output_config.into(), + &cpal_output_config.clone().into(), move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), failed_pcm_give, None, @@ -132,6 +135,10 @@ pub fn audio_transmitter_main( )))?, }; + // Inform dsp_outb about sample rate + tx_sample_rate_notify.send(cpal_output_config.sample_rate().0) + .expect("dsp_outb has dropped srn rx; it has probably panicked"); + output_stream .play() .context("failed to start output audio stream")?; @@ -141,15 +148,19 @@ pub fn audio_transmitter_main( fn give_output_pcm(pcm: &mut [T], channel: &mut Receiver) where - T: Sample + std::fmt::Debug + FromSample + FromSample, + T: Sample + std::fmt::Debug + FromSample, { trace!("ALSA demands {} samples...", pcm.len()); + let mut have_complained_no_messages = false; for outgoing in pcm.iter_mut() { *outgoing = match channel.try_recv() { Ok(sample) => sample.to_sample::(), Err(_) => { - warn!("rx_for_wire has no samples for us! carrier dead!"); - warn!("are we able to produce samples quickly enough? CPU OK?"); + //if have_complained_no_messages == true { + warn!("rx_for_wire has no samples for us! carrier dead!"); + warn!("are we able to produce samples quickly enough? CPU OK?"); + //have_complained_no_messages = false; + //} T::EQUILIBRIUM } }; diff --git a/src/dsp_common/carrier.rs b/src/dsp_common/carrier.rs index 37b98c0..5f224e4 100644 --- a/src/dsp_common/carrier.rs +++ b/src/dsp_common/carrier.rs @@ -7,6 +7,7 @@ const TAU: f64 = 2.0 * std::f64::consts::PI; pub struct TwoStateCarrier { pub sample_rate: f64, + pub symbol_rate: f64, pub freq_low: f64, pub freq_high: f64, phase_low: f64, @@ -15,7 +16,7 @@ pub struct TwoStateCarrier { pub trait Carrier { fn byte_into_fsk_samples(&mut self, byte: &u8) -> Vec; - fn idle(&mut self) -> [PcmSample; 8]; + fn idle(&mut self) -> [PcmSample; 64]; } impl Carrier for TwoStateCarrier { @@ -37,32 +38,32 @@ impl Carrier for TwoStateCarrier { // for each bit in the byte (LSb..=MSb)... for bit_place in 1..=8 { // if the bit is high... - if (byte >> bit_place) & 1 == 0 { + if (byte >> (bit_place - 1)) & 1 == 0 { // while the signal hasn't yet completed a single period... - while (self.phase_low) < self.sample_rate / self.freq_low { + while (self.phase_low) < self.sample_rate / self.symbol_rate { // push the next sample of the relative sine wave - out.push(PcmSample::F64( + out.push( (self.phase_low * TAU * self.freq_low / self.sample_rate) - .sin(), - )); + .sin() + ); // next sample self.phase_low += 1.0; } // if phase counter exceeds 1 period, drop it down again self.phase_low = - self.phase_low % (self.sample_rate / self.freq_low); - } else if (byte >> bit_place) & 1 == 1 { - while (self.phase_high) < self.sample_rate / self.freq_high { - out.push(PcmSample::F64( + self.phase_low % (self.sample_rate / self.symbol_rate); + } else if (byte >> (bit_place - 1)) & 1 == 1 { + while (self.phase_high) < self.sample_rate / self.symbol_rate { + out.push( (self.phase_high * TAU * self.freq_high / self.sample_rate) .sin(), - )); + ); self.phase_high += 1.0; } self.phase_high = - self.phase_high % (self.sample_rate / self.freq_high); + self.phase_high % (self.sample_rate / self.symbol_rate); } else { // this keeps happening unreachable!("FSK bit math returned a value that isn't 0 or 1"); @@ -72,13 +73,11 @@ impl Carrier for TwoStateCarrier { out } - fn idle(&mut self) -> [PcmSample; 8] { - const INIT_VALUE: PcmSample = PcmSample::F64(0.0); - let mut out: [PcmSample; 8] = [INIT_VALUE; 8]; - for i in 0..=7 { - out[i] = PcmSample::F64( - (self.phase_low * TAU * self.freq_low/ self.sample_rate).sin() - ); + fn idle(&mut self) -> [PcmSample; 64] { + let mut out: [PcmSample; 64] = [0.0; 64]; + for i in 0..=63 { + out[i] = + (self.phase_low * TAU * self.freq_low/ self.sample_rate).sin(); self.phase_low += 1.0; } self.phase_low = self.phase_low % (self.sample_rate / self.freq_low); @@ -87,11 +86,12 @@ impl Carrier for TwoStateCarrier { } impl TwoStateCarrier { - pub fn new(freq_low: u32, freq_high: u32, sample_rate: u32) -> Self { + pub fn new(freq_low: u32, freq_high: u32, sample_rate: u32, symbol_rate: u32) -> Self { TwoStateCarrier { freq_low: freq_low.into(), freq_high: freq_high.into(), sample_rate: sample_rate.into(), + symbol_rate: symbol_rate.into(), phase_low: 0_f64, phase_high: 0_f64, } diff --git a/src/dsp_common/sample.rs b/src/dsp_common/sample.rs index 042cd31..d29cf5b 100644 --- a/src/dsp_common/sample.rs +++ b/src/dsp_common/sample.rs @@ -1,11 +1,12 @@ -#[derive(Copy)] -pub enum PcmSample { - U8(u8), - I8(i8), - U16(u16), - I32(i32), - F32(f32), - F64(f64), - U64(u64), - I64(i64), -} +//#[derive(Copy)] +//pub enum PcmSample { +// U8(u8), +// I8(i8), +// U16(u16), +// I32(i32), +// F32(f32), +// F64(f64), +// U64(u64), +// I64(i64), +//} +pub type PcmSample = f64; diff --git a/src/dsp_outb/mod.rs b/src/dsp_outb/mod.rs index 0c46e0f..d7ba573 100644 --- a/src/dsp_outb/mod.rs +++ b/src/dsp_outb/mod.rs @@ -7,30 +7,49 @@ use tracing::{error, info, trace, warn}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::error::TryRecvError; -use crate::task_msg::{RawEthFrameMessage, PcmSampleMessage}; +use crate::task_msg::{RawEthFrameMessage, PcmSampleMessage, SampleRateNotifyMessage}; use crate::dsp_common::carrier::{Carrier, TwoStateCarrier}; pub async fn dsp_outb_main( tx_for_wire: Sender, mut rx_for_mod: Receiver, + mut rx_sample_rate_notify: tokio::sync::oneshot::Receiver, ) -> Result<()> { trace!("pcm_outb task started"); - let carrier_phase = 0_f64; - let carrier_low = 240; - let carrier_high = 250; - // TODO: fetch real sample rate - let mut carrier = TwoStateCarrier::new(carrier_low, carrier_high, 384000); + let mut carrier = TwoStateCarrier::new( + 320, // freq_low + 384, // freq_high + rx_sample_rate_notify.await?, // sample_rate + 4, // symbol_rate + ); loop { - let eth_frame = match rx_for_mod.try_recv() { - Ok(x) => x, + match rx_for_mod.try_recv() { + Ok(eth_frame) => { + trace!("network frame came in"); + let mut txed = 0; + let mut bytes = 0; + + for byte in eth_frame { + for samp in carrier.byte_into_fsk_samples(&byte) { + tx_for_wire.send(samp).await?; + txed += 1; + } + bytes += 1; + } + + trace!("eth frame modulated and sent ({} bytes, {} samps)", bytes, txed); + //(*carrier_phase * TAU * freq / sample_rate).sin() + }, Err(TryRecvError::Empty) => { trace!("no eth frames to modulate!"); //tx_for_wire.send(i64::EQUILIBRIUM).await?; //continue; - vec![0, 0, 0, 0, 0, 0, 0, 0,] + for samp in carrier.idle() { + tx_for_wire.send(samp).await?; + } } Err(TryRecvError::Disconnected) => { Err(anyhow::Error::msg( @@ -38,19 +57,5 @@ pub async fn dsp_outb_main( ))? } }; - trace!("network frame came in"); - let mut txed = 0; - let mut bytes = 0; - - for byte in eth_frame { - for samp in carrier.byte_into_fsk_samples(&byte) { - tx_for_wire.send(samp).await?; - txed += 1; - } - bytes += 1; - } - - trace!("eth frame modulated and sent ({} bytes, {} samps)", bytes, txed); - //(*carrier_phase * TAU * freq / sample_rate).sin() } } diff --git a/src/main.rs b/src/main.rs index fc2228c..d862263 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,8 @@ use cpal::traits::HostTrait; use tracing::{trace}; +use tracing_subscriber::layer::SubscriberExt; + use tokio::sync::mpsc::{channel, unbounded_channel}; //use ringbuf:: @@ -32,7 +34,7 @@ mod tap_junction; use crate::tap_junction::tap_junction_main; mod task_msg; -use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, RawEthFrameMessage}; +use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, RawEthFrameMessage, SampleRateNotifyMessage}; mod wire_fmt; @@ -57,7 +59,10 @@ async fn main() -> Result<()> { let (tx_for_eth_inb, rx_for_eth_inb) = unbounded_channel::(); let (tx_for_mod, rx_for_mod) = channel::(1024); - let (tx_for_wire, rx_for_wire) = channel::(1024); + let (tx_for_wire, rx_for_wire) = channel::(256); + + let (tx_sample_rate_notify, rx_sample_rate_notify) = + tokio::sync::oneshot::channel::(); trace!("starting listener for audio on wire"); let audio_in_stream = audio_receiver_main(tx_for_demod, &cpal_input) @@ -75,10 +80,10 @@ async fn main() -> Result<()> { }); let task_dsp_outb = tasks.spawn(async move { trace!("Starting outbound audio modulator"); - dsp_outb_main(tx_for_wire, rx_for_mod).await + dsp_outb_main(tx_for_wire, rx_for_mod, rx_sample_rate_notify).await }); trace!("Starting audio sender"); - let audio_out_stream = audio_transmitter_main(rx_for_wire, &cpal_output) + let audio_out_stream = audio_transmitter_main(rx_for_wire, tx_sample_rate_notify, &cpal_output) .context("transmitter for audio on wire has failed to start")?; while let Some(task_result) = tasks.join_next().await { diff --git a/src/task_msg.rs b/src/task_msg.rs index 69e8058..a82cf95 100644 --- a/src/task_msg.rs +++ b/src/task_msg.rs @@ -1,3 +1,6 @@ -pub type PcmFrameMessage = Vec; -pub type PcmSampleMessage = i64; +use crate::dsp_common::sample::PcmSample; + +pub type PcmFrameMessage = Vec; +pub type PcmSampleMessage = PcmSample; pub type RawEthFrameMessage = Vec; +pub type SampleRateNotifyMessage = u32;