diff --git a/src/audio_receiver/mod.rs b/src/audio_receiver/mod.rs index 45963d5..5032da8 100644 --- a/src/audio_receiver/mod.rs +++ b/src/audio_receiver/mod.rs @@ -7,10 +7,8 @@ use tracing::{error, info, trace, warn}; use tokio::sync::mpsc::Sender; -use crate::task_msg::PcmFrameMessage; - pub fn audio_receiver_main( - tx_for_demod: Sender, + tx_for_demod: Sender>, cpal_input: &cpal::Device, ) -> Result { trace!("audio_receiver started"); @@ -105,7 +103,7 @@ pub fn audio_receiver_main( Ok(input_stream) } -fn take_input_pcm(pcm: &[T], channel: &Sender) where T: Sample + std::fmt::Debug, f64: FromSample { +fn take_input_pcm(pcm: &[T], channel: &Sender>) where T: Sample + std::fmt::Debug, f64: FromSample { let _ = pcm; //if channel.try_send(pcm.iter().map(|&n| n.to_sample::()).collect()).is_err() { // error!("demodulator PCM data queue is full???"); diff --git a/src/audio_transmitter/mod.rs b/src/audio_transmitter/mod.rs index ef49736..3ac1af1 100644 --- a/src/audio_transmitter/mod.rs +++ b/src/audio_transmitter/mod.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use anyhow::{Context, Result}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; @@ -7,12 +9,11 @@ use tracing::{error, info, trace, warn}; use tokio::sync::mpsc::Receiver; - -use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, SampleRateNotifyMessage}; +use crate::dsp_common::carrier::{Carrier, TwoStateCarrier}; pub fn audio_transmitter_main( - mut rx_for_wire: Receiver, - mut tx_sample_rate_notify: tokio::sync::oneshot::Sender, + mut rx_for_mod: Receiver, + tx_sample_rate_notify: tokio::sync::oneshot::Sender, cpal_output: &cpal::Device, ) -> Result { trace!("audio_transmitter started"); @@ -35,7 +36,7 @@ pub fn audio_transmitter_main( } }).expect("output device has no supported configurations available") .with_max_sample_rate(); - //.with_sample_rate(cpal::SampleRate(44100)); + //.with_sample_rate(cpal::SampleRate(44100)); info!( "output samples are {} @ {}hz on {} channels", @@ -48,11 +49,24 @@ pub fn audio_transmitter_main( warn!("dropped some PCM data on output stream?! {}", e); }; + // TODO: from cli + let mut carrier = TwoStateCarrier::new( + 3200, // freq_low + 3840, // freq_high + cpal_output_config.sample_rate().0, // sample_rate + 48, // symbol_rate + ); + + let mut leftovers: Vec = vec![]; + + // TODO: config .clone()s should be obsolete let output_stream = match cpal_output_config.sample_format() { cpal::SampleFormat::U8 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -60,7 +74,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::I8 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -68,7 +84,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::U16 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -76,7 +94,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::I16 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -84,7 +104,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::U32 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -92,7 +114,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::I32 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -100,7 +124,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::F32 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -108,7 +134,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::F64 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -116,7 +144,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::U64 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -124,7 +154,9 @@ pub fn audio_transmitter_main( cpal::SampleFormat::I64 => cpal_output .build_output_stream( &cpal_output_config.clone().into(), - move |pcm, _: &_| give_output_pcm::(pcm, &mut rx_for_wire), + move |pcm, _: &_| { + give_output_pcm::(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers) + }, failed_pcm_give, None, ) @@ -135,10 +167,6 @@ pub fn audio_transmitter_main( )))?, }; - // Inform dsp_outb about sample rate - tx_sample_rate_notify.send(cpal_output_config.sample_rate().0) - .expect("dsp_outb has dropped srn rx; it has probably panicked"); - output_stream .play() .context("failed to start output audio stream")?; @@ -146,24 +174,64 @@ pub fn audio_transmitter_main( Ok(output_stream) } -fn give_output_pcm(pcm: &mut [T], channel: &mut Receiver) -where +fn give_output_pcm( + pcm: &mut [T], + rx_for_mod: &mut Receiver, + carrier: &mut TwoStateCarrier, + leftovers: &mut Vec, +) where T: Sample + std::fmt::Debug + FromSample, { trace!("ALSA demands {} samples...", pcm.len()); - let mut have_complained_no_messages = false; - for outgoing in pcm.iter_mut() { - *outgoing = match channel.try_recv() { - Ok(sample) => sample.to_sample::(), - Err(_) => { - //if have_complained_no_messages == true { - warn!("rx_for_wire has no samples for us! carrier dead!"); - warn!("are we able to produce samples quickly enough? CPU OK?"); - //have_complained_no_messages = false; - //} - T::EQUILIBRIUM - } - }; + let mut output = pcm.iter_mut().peekable(); + let mut leftovers_owned = leftovers.clone(); + let mut leftovers_iter = leftovers_owned.into_iter(); + + while let Some(samp) = leftovers_iter.next() { + if let Some(sample_to_write) = output.next() { + *sample_to_write = samp.to_sample::(); + } else { + *leftovers = leftovers_iter.collect(); + //leftovers.extend(leftovers_iter); + trace!("ALSA demand fulfilled (last samp with leftovers)! collected {} leftovers", leftovers.len()); + return; + } } - trace!("gave ALSA the samples"); + + while let Ok(byte) = rx_for_mod.try_recv() { + //trace!("recv byte {}", byte); + let samps = carrier.byte_into_fsk_samples(&byte); + let mut samps_iter = samps.iter(); + //trace!("Samples for this byte: {:?}", samps_iter); + while let Some(samp) = samps_iter.next() { + if let Some(sample_to_write) = output.next() { + *sample_to_write = samp.to_sample::(); + } else { + *leftovers = leftovers_iter.collect(); + //leftovers.extend(samps_iter); + trace!("{:?}", leftovers); + trace!("ALSA demand fulfilled (last samp with modulated eth data)! collected {} leftovers", leftovers.len()); + return; + } + } + } + //trace!("No more eth frame bytes, idling carrier"); + while let Some(_) = output.peek() { + let idle_samps = carrier.idle(); + let mut idle_samps_iter = idle_samps.iter(); + while let Some(samp) = idle_samps_iter.next() { + if let Some(sample_to_write) = output.next() { + *sample_to_write = samp.to_sample::(); + } else { + *leftovers = leftovers_iter.collect(); + //leftovers.extend(idle_samps_iter); + trace!("ALSA demand fulfilled (last samp with empty carrier)! collected {} leftovers", leftovers.len()); + return; + } + } + } + + //*leftovers = leftovers_iter.collect(); + leftovers.extend(leftovers_iter); + trace!("ALSA demand fulfilled! collected {} leftovers", leftovers.len()); } diff --git a/src/dsp_common/carrier.rs b/src/dsp_common/carrier.rs index 5f224e4..cc8867a 100644 --- a/src/dsp_common/carrier.rs +++ b/src/dsp_common/carrier.rs @@ -10,8 +10,7 @@ pub struct TwoStateCarrier { pub symbol_rate: f64, pub freq_low: f64, pub freq_high: f64, - phase_low: f64, - phase_high: f64, + phase: f64, } pub trait Carrier { @@ -20,7 +19,7 @@ pub trait Carrier { } impl Carrier for TwoStateCarrier { - /// turn a u8 into bits &do FSK on the carrier for each, returning samples + /// turn a u8 into bits & do FSK on the carrier for each, returning samples fn byte_into_fsk_samples(&mut self, byte: &u8) -> Vec { //let mut bits: [u8; 8] = [0, 0, 0, 0, 0, 0, 0, 0]; let mut out: Vec = vec![]; @@ -40,34 +39,31 @@ impl Carrier for TwoStateCarrier { // if the bit is high... if (byte >> (bit_place - 1)) & 1 == 0 { // while the signal hasn't yet completed a single period... - while (self.phase_low) < self.sample_rate / self.symbol_rate { + while self.phase < self.sample_rate / self.symbol_rate { + //trace!("generating lo sample for bit {bit_place}"); // push the next sample of the relative sine wave out.push( - (self.phase_low * TAU * self.freq_low - / self.sample_rate) - .sin() - ); - // next sample - self.phase_low += 1.0; - } - // if phase counter exceeds 1 period, drop it down again - self.phase_low = - self.phase_low % (self.sample_rate / self.symbol_rate); - } else if (byte >> (bit_place - 1)) & 1 == 1 { - while (self.phase_high) < self.sample_rate / self.symbol_rate { - out.push( - (self.phase_high * TAU * self.freq_high - / self.sample_rate) + (self.phase * TAU * self.freq_low / self.sample_rate) .sin(), ); - self.phase_high += 1.0; + // next sample + self.phase += 1.0; + } + // if phase counter exceeds 1 period, drop it down again + } else if (byte >> (bit_place - 1)) & 1 == 1 { + while self.phase < self.sample_rate / self.symbol_rate { + //trace!("generating hi sample for bit {bit_place}"); + out.push( + (self.phase * TAU * self.freq_high / self.sample_rate) + .sin(), + ); + self.phase += 1.0; } - self.phase_high = - self.phase_high % (self.sample_rate / self.symbol_rate); } else { // this keeps happening unreachable!("FSK bit math returned a value that isn't 0 or 1"); } + self.phase = self.phase % (self.sample_rate / self.symbol_rate); } //trace!("FSK calculations turned up {:?}", &out); out @@ -77,10 +73,10 @@ impl Carrier for TwoStateCarrier { let mut out: [PcmSample; 64] = [0.0; 64]; for i in 0..=63 { out[i] = - (self.phase_low * TAU * self.freq_low/ self.sample_rate).sin(); - self.phase_low += 1.0; + (self.phase * TAU * self.freq_low/ self.sample_rate).sin(); + self.phase += 1.0; } - self.phase_low = self.phase_low % (self.sample_rate / self.freq_low); + self.phase = self.phase % (self.sample_rate / self.freq_low); out } } @@ -92,8 +88,7 @@ impl TwoStateCarrier { freq_high: freq_high.into(), sample_rate: sample_rate.into(), symbol_rate: symbol_rate.into(), - phase_low: 0_f64, - phase_high: 0_f64, + phase: 0_f64, } } // setters @@ -103,10 +98,10 @@ impl TwoStateCarrier { fn freq_high(mut self, freq_high: u32) { self.freq_high = freq_high.into(); } - fn phase_low(mut self, phase_low: u32) { - self.phase_low = phase_low.into(); + fn sample_rate(mut self, sample_rate: u32) { + self.sample_rate = sample_rate.into(); } - fn phase_high(mut self, phase_high: u32) { - self.phase_high = phase_high.into(); + fn symbol_rate(mut self, symbol_rate: u32) { + self.symbol_rate = symbol_rate.into(); } } diff --git a/src/dsp_inb/mod.rs b/src/dsp_inb/mod.rs index 0c12e98..ee0c8e8 100644 --- a/src/dsp_inb/mod.rs +++ b/src/dsp_inb/mod.rs @@ -4,11 +4,9 @@ use tracing::{error, info, trace, warn}; use tokio::sync::mpsc::{Receiver, UnboundedSender}; -use crate::task_msg::{RawEthFrameMessage, PcmFrameMessage}; - pub async fn dsp_inb_main( - tx_for_eth_inb: UnboundedSender, - rx_for_demod: Receiver, + tx_for_eth_inb: UnboundedSender>, + rx_for_demod: Receiver>, ) -> Result<()> { trace!("pcm_inb task started"); diff --git a/src/main.rs b/src/main.rs index d862263..7765c02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,15 +27,12 @@ mod dsp_common; mod dsp_inb; use crate::dsp_inb::dsp_inb_main; -mod dsp_outb; -use crate::dsp_outb::dsp_outb_main; +//mod dsp_outb; +//use crate::dsp_outb::dsp_outb_main; mod tap_junction; use crate::tap_junction::tap_junction_main; -mod task_msg; -use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, RawEthFrameMessage, SampleRateNotifyMessage}; - mod wire_fmt; //#[derive(argh::FromArgs)] @@ -55,14 +52,14 @@ async fn main() -> Result<()> { "no audio output could be found; is a sound card installed?" )?; - let (tx_for_demod, rx_for_demod) = channel::(1024); - let (tx_for_eth_inb, rx_for_eth_inb) = - unbounded_channel::(); - let (tx_for_mod, rx_for_mod) = channel::(1024); - let (tx_for_wire, rx_for_wire) = channel::(256); + let (tx_for_demod, rx_for_demod) = channel::>(1024); + let (tx_for_eth_inb, rx_for_eth_inb) = channel::>(8192); + //let (tx_for_mod, rx_for_mod) = channel::(1024); + let (tx_for_mod, rx_for_mod) = channel::(8192); + //let (tx_for_wire, rx_for_wire) = channel::(256); let (tx_sample_rate_notify, rx_sample_rate_notify) = - tokio::sync::oneshot::channel::(); + tokio::sync::oneshot::channel::(); trace!("starting listener for audio on wire"); let audio_in_stream = audio_receiver_main(tx_for_demod, &cpal_input) @@ -78,12 +75,12 @@ async fn main() -> Result<()> { trace!("Starting tap manager"); tap_junction_main(tx_for_mod, rx_for_eth_inb).await }); - let task_dsp_outb = tasks.spawn(async move { - trace!("Starting outbound audio modulator"); - dsp_outb_main(tx_for_wire, rx_for_mod, rx_sample_rate_notify).await - }); + //let task_dsp_outb = tasks.spawn(async move { + // trace!("Starting outbound audio modulator"); + // dsp_outb_main(tx_for_wire, rx_for_mod, rx_sample_rate_notify).await + //}); trace!("Starting audio sender"); - let audio_out_stream = audio_transmitter_main(rx_for_wire, tx_sample_rate_notify, &cpal_output) + let audio_out_stream = audio_transmitter_main(rx_for_mod, tx_sample_rate_notify, &cpal_output) .context("transmitter for audio on wire has failed to start")?; while let Some(task_result) = tasks.join_next().await { diff --git a/src/tap_junction/mod.rs b/src/tap_junction/mod.rs index b2e3d5f..743e098 100644 --- a/src/tap_junction/mod.rs +++ b/src/tap_junction/mod.rs @@ -1,10 +1,8 @@ use anyhow::{Context, Result}; -use crate::task_msg::RawEthFrameMessage; - pub async fn tap_junction_main( - tx_for_mod: tokio::sync::mpsc::Sender, - rx_for_eth_inb: tokio::sync::mpsc::UnboundedReceiver, + tx_for_mod: tokio::sync::mpsc::Sender, + rx_for_eth_inb: tokio::sync::mpsc::Receiver>, ) -> Result<()> { const MTU: i32 = 1500; let tap_dev = tokio_tun::Tun::builder() @@ -21,6 +19,10 @@ pub async fn tap_junction_main( loop { let frame_length = tap_dev.recv(&mut buf).await?; println!("{:?}", &buf[..frame_length]); - tx_for_mod.send((&buf[..frame_length]).to_vec()).await?; + //tx_for_mod.send((&buf[..frame_length]).to_vec()).await?; // by frame + for byte in buf[..frame_length].iter() { + tx_for_mod.send(*byte).await + .expect("tap_junction channel tx failed. probable panic at dest"); + } } } diff --git a/src/task_msg.rs b/src/task_msg.rs deleted file mode 100644 index a82cf95..0000000 --- a/src/task_msg.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::dsp_common::sample::PcmSample; - -pub type PcmFrameMessage = Vec; -pub type PcmSampleMessage = PcmSample; -pub type RawEthFrameMessage = Vec; -pub type SampleRateNotifyMessage = u32;