mein Gott ,

This commit is contained in:
TerraMaster85 2024-12-21 23:19:10 -05:00
parent b092907a8a
commit e632ac3c44
7 changed files with 153 additions and 101 deletions

View file

@ -7,10 +7,8 @@ use tracing::{error, info, trace, warn};
use tokio::sync::mpsc::Sender;
use crate::task_msg::PcmFrameMessage;
pub fn audio_receiver_main(
tx_for_demod: Sender<PcmFrameMessage>,
tx_for_demod: Sender<Vec<f64>>,
cpal_input: &cpal::Device,
) -> Result<cpal::Stream> {
trace!("audio_receiver started");
@ -105,7 +103,7 @@ pub fn audio_receiver_main(
Ok(input_stream)
}
fn take_input_pcm<T>(pcm: &[T], channel: &Sender<PcmFrameMessage>) where T: Sample + std::fmt::Debug, f64: FromSample<T> {
fn take_input_pcm<T>(pcm: &[T], channel: &Sender<Vec<f64>>) where T: Sample + std::fmt::Debug, f64: FromSample<T> {
let _ = pcm;
//if channel.try_send(pcm.iter().map(|&n| n.to_sample::<f64>()).collect()).is_err() {
// error!("demodulator PCM data queue is full???");

View file

@ -1,3 +1,5 @@
use std::collections::VecDeque;
use anyhow::{Context, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
@ -7,12 +9,11 @@ use tracing::{error, info, trace, warn};
use tokio::sync::mpsc::Receiver;
use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, SampleRateNotifyMessage};
use crate::dsp_common::carrier::{Carrier, TwoStateCarrier};
pub fn audio_transmitter_main(
mut rx_for_wire: Receiver<PcmSampleMessage>,
mut tx_sample_rate_notify: tokio::sync::oneshot::Sender<SampleRateNotifyMessage>,
mut rx_for_mod: Receiver<u8>,
tx_sample_rate_notify: tokio::sync::oneshot::Sender<u32>,
cpal_output: &cpal::Device,
) -> Result<cpal::Stream> {
trace!("audio_transmitter started");
@ -48,11 +49,24 @@ pub fn audio_transmitter_main(
warn!("dropped some PCM data on output stream?! {}", e);
};
// TODO: from cli
let mut carrier = TwoStateCarrier::new(
3200, // freq_low
3840, // freq_high
cpal_output_config.sample_rate().0, // sample_rate
48, // symbol_rate
);
let mut leftovers: Vec<f64> = vec![];
// TODO: config .clone()s should be obsolete
let output_stream = match cpal_output_config.sample_format() {
cpal::SampleFormat::U8 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<u8>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<u8>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -60,7 +74,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::I8 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<i8>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<i8>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -68,7 +84,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::U16 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<u16>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<u16>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -76,7 +94,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::I16 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<i16>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<i16>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -84,7 +104,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::U32 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<u32>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<u32>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -92,7 +114,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::I32 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<i32>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<i32>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -100,7 +124,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::F32 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<f32>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<f32>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -108,7 +134,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::F64 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<f64>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<f64>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -116,7 +144,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::U64 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<u64>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<u64>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -124,7 +154,9 @@ pub fn audio_transmitter_main(
cpal::SampleFormat::I64 => cpal_output
.build_output_stream(
&cpal_output_config.clone().into(),
move |pcm, _: &_| give_output_pcm::<i64>(pcm, &mut rx_for_wire),
move |pcm, _: &_| {
give_output_pcm::<i64>(pcm, &mut rx_for_mod, &mut carrier, &mut leftovers)
},
failed_pcm_give,
None,
)
@ -135,10 +167,6 @@ pub fn audio_transmitter_main(
)))?,
};
// Inform dsp_outb about sample rate
tx_sample_rate_notify.send(cpal_output_config.sample_rate().0)
.expect("dsp_outb has dropped srn rx; it has probably panicked");
output_stream
.play()
.context("failed to start output audio stream")?;
@ -146,24 +174,64 @@ pub fn audio_transmitter_main(
Ok(output_stream)
}
fn give_output_pcm<T>(pcm: &mut [T], channel: &mut Receiver<PcmSampleMessage>)
where
fn give_output_pcm<T>(
pcm: &mut [T],
rx_for_mod: &mut Receiver<u8>,
carrier: &mut TwoStateCarrier,
leftovers: &mut Vec<f64>,
) where
T: Sample + std::fmt::Debug + FromSample<f64>,
{
trace!("ALSA demands {} samples...", pcm.len());
let mut have_complained_no_messages = false;
for outgoing in pcm.iter_mut() {
*outgoing = match channel.try_recv() {
Ok(sample) => sample.to_sample::<T>(),
Err(_) => {
//if have_complained_no_messages == true {
warn!("rx_for_wire has no samples for us! carrier dead!");
warn!("are we able to produce samples quickly enough? CPU OK?");
//have_complained_no_messages = false;
//}
T::EQUILIBRIUM
let mut output = pcm.iter_mut().peekable();
let mut leftovers_owned = leftovers.clone();
let mut leftovers_iter = leftovers_owned.into_iter();
while let Some(samp) = leftovers_iter.next() {
if let Some(sample_to_write) = output.next() {
*sample_to_write = samp.to_sample::<T>();
} else {
*leftovers = leftovers_iter.collect();
//leftovers.extend(leftovers_iter);
trace!("ALSA demand fulfilled (last samp with leftovers)! collected {} leftovers", leftovers.len());
return;
}
};
}
trace!("gave ALSA the samples");
while let Ok(byte) = rx_for_mod.try_recv() {
//trace!("recv byte {}", byte);
let samps = carrier.byte_into_fsk_samples(&byte);
let mut samps_iter = samps.iter();
//trace!("Samples for this byte: {:?}", samps_iter);
while let Some(samp) = samps_iter.next() {
if let Some(sample_to_write) = output.next() {
*sample_to_write = samp.to_sample::<T>();
} else {
*leftovers = leftovers_iter.collect();
//leftovers.extend(samps_iter);
trace!("{:?}", leftovers);
trace!("ALSA demand fulfilled (last samp with modulated eth data)! collected {} leftovers", leftovers.len());
return;
}
}
}
//trace!("No more eth frame bytes, idling carrier");
while let Some(_) = output.peek() {
let idle_samps = carrier.idle();
let mut idle_samps_iter = idle_samps.iter();
while let Some(samp) = idle_samps_iter.next() {
if let Some(sample_to_write) = output.next() {
*sample_to_write = samp.to_sample::<T>();
} else {
*leftovers = leftovers_iter.collect();
//leftovers.extend(idle_samps_iter);
trace!("ALSA demand fulfilled (last samp with empty carrier)! collected {} leftovers", leftovers.len());
return;
}
}
}
//*leftovers = leftovers_iter.collect();
leftovers.extend(leftovers_iter);
trace!("ALSA demand fulfilled! collected {} leftovers", leftovers.len());
}

View file

@ -10,8 +10,7 @@ pub struct TwoStateCarrier {
pub symbol_rate: f64,
pub freq_low: f64,
pub freq_high: f64,
phase_low: f64,
phase_high: f64,
phase: f64,
}
pub trait Carrier {
@ -40,34 +39,31 @@ impl Carrier for TwoStateCarrier {
// if the bit is high...
if (byte >> (bit_place - 1)) & 1 == 0 {
// while the signal hasn't yet completed a single period...
while (self.phase_low) < self.sample_rate / self.symbol_rate {
while self.phase < self.sample_rate / self.symbol_rate {
//trace!("generating lo sample for bit {bit_place}");
// push the next sample of the relative sine wave
out.push(
(self.phase_low * TAU * self.freq_low
/ self.sample_rate)
.sin()
);
// next sample
self.phase_low += 1.0;
}
// if phase counter exceeds 1 period, drop it down again
self.phase_low =
self.phase_low % (self.sample_rate / self.symbol_rate);
} else if (byte >> (bit_place - 1)) & 1 == 1 {
while (self.phase_high) < self.sample_rate / self.symbol_rate {
out.push(
(self.phase_high * TAU * self.freq_high
/ self.sample_rate)
(self.phase * TAU * self.freq_low / self.sample_rate)
.sin(),
);
self.phase_high += 1.0;
// next sample
self.phase += 1.0;
}
// if phase counter exceeds 1 period, drop it down again
} else if (byte >> (bit_place - 1)) & 1 == 1 {
while self.phase < self.sample_rate / self.symbol_rate {
//trace!("generating hi sample for bit {bit_place}");
out.push(
(self.phase * TAU * self.freq_high / self.sample_rate)
.sin(),
);
self.phase += 1.0;
}
self.phase_high =
self.phase_high % (self.sample_rate / self.symbol_rate);
} else {
// this keeps happening
unreachable!("FSK bit math returned a value that isn't 0 or 1");
}
self.phase = self.phase % (self.sample_rate / self.symbol_rate);
}
//trace!("FSK calculations turned up {:?}", &out);
out
@ -77,10 +73,10 @@ impl Carrier for TwoStateCarrier {
let mut out: [PcmSample; 64] = [0.0; 64];
for i in 0..=63 {
out[i] =
(self.phase_low * TAU * self.freq_low/ self.sample_rate).sin();
self.phase_low += 1.0;
(self.phase * TAU * self.freq_low/ self.sample_rate).sin();
self.phase += 1.0;
}
self.phase_low = self.phase_low % (self.sample_rate / self.freq_low);
self.phase = self.phase % (self.sample_rate / self.freq_low);
out
}
}
@ -92,8 +88,7 @@ impl TwoStateCarrier {
freq_high: freq_high.into(),
sample_rate: sample_rate.into(),
symbol_rate: symbol_rate.into(),
phase_low: 0_f64,
phase_high: 0_f64,
phase: 0_f64,
}
}
// setters
@ -103,10 +98,10 @@ impl TwoStateCarrier {
fn freq_high(mut self, freq_high: u32) {
self.freq_high = freq_high.into();
}
fn phase_low(mut self, phase_low: u32) {
self.phase_low = phase_low.into();
fn sample_rate(mut self, sample_rate: u32) {
self.sample_rate = sample_rate.into();
}
fn phase_high(mut self, phase_high: u32) {
self.phase_high = phase_high.into();
fn symbol_rate(mut self, symbol_rate: u32) {
self.symbol_rate = symbol_rate.into();
}
}

View file

@ -4,11 +4,9 @@ use tracing::{error, info, trace, warn};
use tokio::sync::mpsc::{Receiver, UnboundedSender};
use crate::task_msg::{RawEthFrameMessage, PcmFrameMessage};
pub async fn dsp_inb_main(
tx_for_eth_inb: UnboundedSender<RawEthFrameMessage>,
rx_for_demod: Receiver<PcmFrameMessage>,
tx_for_eth_inb: UnboundedSender<Vec<u8>>,
rx_for_demod: Receiver<Vec<f64>>,
) -> Result<()> {
trace!("pcm_inb task started");

View file

@ -27,15 +27,12 @@ mod dsp_common;
mod dsp_inb;
use crate::dsp_inb::dsp_inb_main;
mod dsp_outb;
use crate::dsp_outb::dsp_outb_main;
//mod dsp_outb;
//use crate::dsp_outb::dsp_outb_main;
mod tap_junction;
use crate::tap_junction::tap_junction_main;
mod task_msg;
use crate::task_msg::{PcmFrameMessage, PcmSampleMessage, RawEthFrameMessage, SampleRateNotifyMessage};
mod wire_fmt;
//#[derive(argh::FromArgs)]
@ -55,14 +52,14 @@ async fn main() -> Result<()> {
"no audio output could be found; is a sound card installed?"
)?;
let (tx_for_demod, rx_for_demod) = channel::<PcmFrameMessage>(1024);
let (tx_for_eth_inb, rx_for_eth_inb) =
unbounded_channel::<RawEthFrameMessage>();
let (tx_for_mod, rx_for_mod) = channel::<RawEthFrameMessage>(1024);
let (tx_for_wire, rx_for_wire) = channel::<PcmSampleMessage>(256);
let (tx_for_demod, rx_for_demod) = channel::<Vec<f64>>(1024);
let (tx_for_eth_inb, rx_for_eth_inb) = channel::<Vec<u8>>(8192);
//let (tx_for_mod, rx_for_mod) = channel::<RawEthFrameMessage>(1024);
let (tx_for_mod, rx_for_mod) = channel::<u8>(8192);
//let (tx_for_wire, rx_for_wire) = channel::<PcmSampleMessage>(256);
let (tx_sample_rate_notify, rx_sample_rate_notify) =
tokio::sync::oneshot::channel::<SampleRateNotifyMessage>();
tokio::sync::oneshot::channel::<u32>();
trace!("starting listener for audio on wire");
let audio_in_stream = audio_receiver_main(tx_for_demod, &cpal_input)
@ -78,12 +75,12 @@ async fn main() -> Result<()> {
trace!("Starting tap manager");
tap_junction_main(tx_for_mod, rx_for_eth_inb).await
});
let task_dsp_outb = tasks.spawn(async move {
trace!("Starting outbound audio modulator");
dsp_outb_main(tx_for_wire, rx_for_mod, rx_sample_rate_notify).await
});
//let task_dsp_outb = tasks.spawn(async move {
// trace!("Starting outbound audio modulator");
// dsp_outb_main(tx_for_wire, rx_for_mod, rx_sample_rate_notify).await
//});
trace!("Starting audio sender");
let audio_out_stream = audio_transmitter_main(rx_for_wire, tx_sample_rate_notify, &cpal_output)
let audio_out_stream = audio_transmitter_main(rx_for_mod, tx_sample_rate_notify, &cpal_output)
.context("transmitter for audio on wire has failed to start")?;
while let Some(task_result) = tasks.join_next().await {

View file

@ -1,10 +1,8 @@
use anyhow::{Context, Result};
use crate::task_msg::RawEthFrameMessage;
pub async fn tap_junction_main(
tx_for_mod: tokio::sync::mpsc::Sender<RawEthFrameMessage>,
rx_for_eth_inb: tokio::sync::mpsc::UnboundedReceiver<RawEthFrameMessage>,
tx_for_mod: tokio::sync::mpsc::Sender<u8>,
rx_for_eth_inb: tokio::sync::mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
const MTU: i32 = 1500;
let tap_dev = tokio_tun::Tun::builder()
@ -21,6 +19,10 @@ pub async fn tap_junction_main(
loop {
let frame_length = tap_dev.recv(&mut buf).await?;
println!("{:?}", &buf[..frame_length]);
tx_for_mod.send((&buf[..frame_length]).to_vec()).await?;
//tx_for_mod.send((&buf[..frame_length]).to_vec()).await?; // by frame
for byte in buf[..frame_length].iter() {
tx_for_mod.send(*byte).await
.expect("tap_junction channel tx failed. probable panic at dest");
}
}
}

View file

@ -1,6 +0,0 @@
use crate::dsp_common::sample::PcmSample;
pub type PcmFrameMessage = Vec<PcmSample>;
pub type PcmSampleMessage = PcmSample;
pub type RawEthFrameMessage = Vec<u8>;
pub type SampleRateNotifyMessage = u32;