[wip] thread workers

This commit is contained in:
c0repwn3r 2023-03-22 14:34:06 -04:00
parent 1daaf5466f
commit 6c1b8f090f
Signed by: core
GPG Key ID: FDBF740DADDCEECF
18 changed files with 444 additions and 41 deletions

5
Cargo.lock generated
View File

@ -2049,9 +2049,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.91" version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -2399,6 +2399,7 @@ dependencies = [
"log", "log",
"reqwest", "reqwest",
"serde", "serde",
"serde_json",
"sha2", "sha2",
"simple_logger", "simple_logger",
"tar", "tar",

View File

@ -17,6 +17,7 @@ hex = "0.4.3"
url = "2.3.1" url = "2.3.1"
toml = "0.7.3" toml = "0.7.3"
serde = { version = "1.0.158", features = ["derive"] } serde = { version = "1.0.158", features = ["derive"] }
serde_json = "1.0.94"
[build-dependencies] [build-dependencies]
serde = { version = "1.0.157", features = ["derive"] } serde = { version = "1.0.157", features = ["derive"] }

11
tfclient/src/apiworker.rs Normal file
View File

@ -0,0 +1,11 @@
use std::sync::mpsc::Receiver;
use crate::config::TFClientConfig;
use crate::daemon::ThreadMessageSender;
pub enum APIWorkerMessage {
}
pub fn apiworker_main(config: TFClientConfig, transmitters: ThreadMessageSender, rx: Receiver<APIWorkerMessage>) {
}

View File

@ -7,7 +7,7 @@ use crate::dirs::{get_config_dir, get_config_file};
pub const DEFAULT_PORT: u16 = 8157; pub const DEFAULT_PORT: u16 = 8157;
fn default_port() -> u16 { DEFAULT_PORT } fn default_port() -> u16 { DEFAULT_PORT }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Clone)]
pub struct TFClientConfig { pub struct TFClientConfig {
#[serde(default = "default_port")] #[serde(default = "default_port")]
listen_port: u16 listen_port: u16

View File

@ -1,25 +1,17 @@
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use std::thread;
use log::{error, info, warn}; use log::{error, info, warn};
use url::Url; use url::Url;
use crate::apiworker::{apiworker_main, APIWorkerMessage};
use crate::config::load_config; use crate::config::load_config;
use crate::nebulaworker::{nebulaworker_main, NebulaWorkerMessage};
use crate::socketworker::{socketworker_main, SocketWorkerMessage};
use crate::util::check_server_url;
pub fn daemon_main(name: String, server: String) { pub fn daemon_main(name: String, server: String) {
// Validate the `server` // Validate the `server`
info!("Checking server url..."); check_server_url(&server);
let api_base = match Url::parse(&server) {
Ok(u) => u,
Err(e) => {
error!("Invalid server url `{}`: {}", server, e);
std::process::exit(1);
}
};
match api_base.scheme() {
"http" => { warn!("HTTP api urls are not reccomended. Please switch to HTTPS if possible.") },
"https" => (),
_ => {
error!("Unsupported protocol `{}` (expected one of http, https)", api_base.scheme());
std::process::exit(1);
}
}
info!("Loading config..."); info!("Loading config...");
let config = match load_config(&name) { let config = match load_config(&name) {
@ -29,4 +21,70 @@ pub fn daemon_main(name: String, server: String) {
std::process::exit(1); std::process::exit(1);
} }
}; };
info!("Starting API thread...");
let (tx_api, rx_api) = mpsc::channel::<APIWorkerMessage>();
let (tx_socket, rx_socket) = mpsc::channel::<SocketWorkerMessage>();
let (tx_nebula, rx_nebula) = mpsc::channel::<NebulaWorkerMessage>();
let transmitter = ThreadMessageSender {
socket_thread: tx_socket,
api_thread: tx_api,
nebula_thread: tx_nebula
};
let config_api = config.clone();
let transmitter_api = transmitter.clone();
let api_thread = thread::spawn(move || {
apiworker_main(config_api, transmitter_api, rx_api);
});
info!("Starting Nebula thread...");
let config_nebula = config.clone();
let transmitter_nebula = transmitter.clone();
let nebula_thread = thread::spawn(move || {
nebulaworker_main(config_nebula, transmitter_nebula, rx_nebula);
});
info!("Starting socket worker thread...");
let socket_thread = thread::spawn(move || {
socketworker_main(config, transmitter, rx_socket);
});
info!("Waiting for socket thread to exit...");
match socket_thread.join() {
Ok(_) => (),
Err(_) => {
error!("Error waiting for socket thread to exit.");
std::process::exit(1);
}
}
info!("Waiting for API thread to exit...");
match api_thread.join() {
Ok(_) => (),
Err(_) => {
error!("Error waiting for api thread to exit.");
std::process::exit(1);
}
}
info!("Waiting for Nebula thread to exit...");
match nebula_thread.join() {
Ok(_) => (),
Err(_) => {
error!("Error waiting for nebula thread to exit.");
std::process::exit(1);
}
}
info!("All threads exited");
}
#[derive(Clone)]
pub struct ThreadMessageSender {
socket_thread: Sender<SocketWorkerMessage>,
api_thread: Sender<APIWorkerMessage>,
nebula_thread: Sender<NebulaWorkerMessage>
} }

View File

@ -21,6 +21,8 @@ pub mod nebulaworker;
pub mod daemon; pub mod daemon;
pub mod config; pub mod config;
pub mod service; pub mod service;
pub mod apiworker;
pub mod socketworker;
pub mod nebula_bin { pub mod nebula_bin {
include!(concat!(env!("OUT_DIR"), "/nebula.bin.rs")); include!(concat!(env!("OUT_DIR"), "/nebula.bin.rs"));
@ -37,6 +39,7 @@ use log::{error, info};
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
use crate::dirs::get_data_dir; use crate::dirs::get_data_dir;
use crate::embedded_nebula::{run_embedded_nebula, run_embedded_nebula_cert}; use crate::embedded_nebula::{run_embedded_nebula, run_embedded_nebula_cert};
use crate::service::entry::{cli_install, cli_start, cli_stop, cli_uninstall};
#[derive(Parser)] #[derive(Parser)]
#[command(author = "c0repwn3r", version, about, long_about = None)] #[command(author = "c0repwn3r", version, about, long_about = None)]
@ -210,10 +213,18 @@ fn main() {
} }
} }
} }
Commands::Install { .. } => {} Commands::Install { server, name } => {
Commands::Uninstall { .. } => {} cli_install(&name, &server);
Commands::Start { .. } => {} }
Commands::Stop { .. } => {} Commands::Uninstall { name } => {
cli_uninstall(&name);
}
Commands::Start { name } => {
cli_start(&name);
}
Commands::Stop { name } => {
cli_stop(&name);
}
Commands::Run { name, server } => { Commands::Run { name, server } => {
daemon::daemon_main(name, server); daemon::daemon_main(name, server);
} }

View File

@ -1,5 +1,13 @@
// Code to handle the command socket worker // Code to handle the nebula worker
pub fn socketworker_main() { use std::sync::mpsc::Receiver;
use crate::config::TFClientConfig;
use crate::daemon::ThreadMessageSender;
pub enum NebulaWorkerMessage {
}
pub fn nebulaworker_main(config: TFClientConfig, transmitter: ThreadMessageSender, rx: Receiver<NebulaWorkerMessage>) {
} }

View File

@ -4,6 +4,6 @@ use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
pub trait ServiceFileGenerator { pub trait ServiceFileGenerator {
fn create_service_files(bin_path: PathBuf, name: &str) -> Result<(), Box<dyn Error>>; fn create_service_files(bin_path: PathBuf, name: &str, server: &str) -> Result<(), Box<dyn Error>>;
fn delete_service_files(name: &str) -> Result<(), Box<dyn Error>>; fn delete_service_files(name: &str) -> Result<(), Box<dyn Error>>;
} }

View File

@ -1,14 +1,48 @@
use std::error::Error; use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
use log::debug;
use crate::service::codegen::ServiceFileGenerator; use crate::service::codegen::ServiceFileGenerator;
use std::fmt::Write;
use std::fs;
pub struct SystemDServiceFileGenerator {} pub struct SystemDServiceFileGenerator {}
impl ServiceFileGenerator for SystemDServiceFileGenerator { impl ServiceFileGenerator for SystemDServiceFileGenerator {
fn create_service_files(bin_path: PathBuf, name: &str) -> Result<(), Box<dyn Error>> { fn create_service_files(bin_path: PathBuf, name: &str, server: &str) -> Result<(), Box<dyn Error>> {
todo!() debug!("Generating a unit file...");
let mut unit_file = String::new();
writeln!(unit_file, "[Unit]")?;
writeln!(unit_file, "Description=A client for Defined Networking compatible overlay networks (instance {})", name)?;
writeln!(unit_file, "Wants=basic.target network-online.target")?;
writeln!(unit_file, "After=basic.target network.target network-online.target")?;
writeln!(unit_file)?;
writeln!(unit_file, "[Service]")?;
writeln!(unit_file, "SyslogIdentifier=tfclient-{}", name)?;
writeln!(unit_file, "ExecStart={} run --server {} --name {}", bin_path.as_path().display(), server, name)?;
writeln!(unit_file, "Restart=always")?;
writeln!(unit_file)?;
writeln!(unit_file, "[Install]")?;
writeln!(unit_file, "WantedBy=multi-user.target")?;
fs::write(format!("/usr/lib/systemd/system/{}.service", SystemDServiceFileGenerator::get_service_file_name(name)), unit_file)?;
debug!("Installed unit file");
Ok(())
} }
fn delete_service_files(name: &str) -> Result<(), Box<dyn Error>> { fn delete_service_files(name: &str) -> Result<(), Box<dyn Error>> {
todo!() debug!("Deleting unit file...");
fs::remove_file(format!("/usr/lib/systemd/system/{}.service", SystemDServiceFileGenerator::get_service_file_name(name)))?;
debug!("Removed unit file");
Ok(())
}
}
impl SystemDServiceFileGenerator {
pub fn get_service_file_name(name: &str) -> String {
format!("tfclient_i-{}", name)
} }
} }

View File

@ -0,0 +1,29 @@
use std::path::Path;
use log::info;
use crate::service::macos::OSXServiceManager;
use crate::service::runit::RunitServiceManager;
use crate::service::ServiceManager;
use crate::service::systemd::SystemDServiceManager;
use crate::service::windows::WindowsServiceManager;
pub fn detect_service() -> Option<Box<dyn ServiceManager>> {
if cfg!(windows) {
return Some(Box::new(WindowsServiceManager {}));
}
if cfg!(macos) {
return Some(Box::new(OSXServiceManager {}));
}
detect_unix_service_manager()
}
pub fn detect_unix_service_manager() -> Option<Box<dyn ServiceManager>> {
if Path::new("/etc/runit/1").exists() {
info!("Detected Runit service supervision (confidence: 100%, /etc/runit/1 exists)");
return Some(Box::new(RunitServiceManager {}))
}
if Path::new("/var/lib/systemd").exists() {
info!("Detected SystemD service supervision (confidence: 100%, /var/lib/systemd exists)");
return Some(Box::new(SystemDServiceManager {}));
}
None
}

View File

@ -0,0 +1,82 @@
use std::env::current_exe;
use log::{error, info, warn};
use crate::service::detect::detect_service;
use crate::util::check_server_url;
pub fn cli_start(name: &str) {
info!("Detecting service manager...");
let service = detect_service();
if let Some(sm) = service {
match sm.start(name) {
Ok(_) => (),
Err(e) => {
error!("Error starting service: {}", e);
std::process::exit(1);
}
}
} else {
error!("Unable to determine which service manager to use. Could not start.");
std::process::exit(1);
}
}
pub fn cli_stop(name: &str) {
info!("Detecting service manager...");
let service = detect_service();
if let Some(sm) = service {
match sm.stop(name) {
Ok(_) => (),
Err(e) => {
error!("Error starting service: {}", e);
std::process::exit(1);
}
}
} else {
error!("Unable to determine which service manager to use. Could not stop.");
std::process::exit(1);
}
}
pub fn cli_install(name: &str, server: &str) {
info!("Checking server url...");
check_server_url(server);
info!("Detecting service manager...");
let service = detect_service();
if let Some(sm) = service {
let current_file = match current_exe() {
Ok(e) => e,
Err(e) => {
error!("Unable to get current binary: {}", e);
std::process::exit(1);
}
};
match sm.install(current_file, name, server) {
Ok(_) => (),
Err(e) => {
error!("Error creating service files: {}", e);
std::process::exit(1);
}
}
} else {
error!("Unable to determine which service manager to use. Could not install.");
std::process::exit(1);
}
}
pub fn cli_uninstall(name: &str) {
info!("Detecting service manager...");
let service = detect_service();
if let Some(sm) = service {
match sm.uninstall(name) {
Ok(_) => (),
Err(e) => {
error!("Error removing service files: {}", e);
std::process::exit(1);
}
}
} else {
error!("Unable to determine which service manager to use. Could not install.");
std::process::exit(1);
}
}

View File

@ -0,0 +1,22 @@
use std::error::Error;
use std::path::PathBuf;
use crate::service::ServiceManager;
pub struct OSXServiceManager {}
impl ServiceManager for OSXServiceManager {
fn install(&self, bin_path: PathBuf, name: &str, server_url: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn uninstall(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn start(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn stop(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
}

View File

@ -3,10 +3,15 @@ use std::path::PathBuf;
pub mod codegen; pub mod codegen;
pub mod systemd; pub mod systemd;
pub mod detect;
pub mod entry;
pub mod windows;
pub mod macos;
pub mod runit;
pub trait ServiceManager { pub trait ServiceManager {
fn install(bin_path: PathBuf, name: &str) -> Result<(), Box<dyn Error>>; fn install(&self, bin_path: PathBuf, name: &str, server_url: &str) -> Result<(), Box<dyn Error>>;
fn uninstall(name: &str) -> Result<(), Box<dyn Error>>; fn uninstall(&self, name: &str) -> Result<(), Box<dyn Error>>;
fn start(name: &str) -> Result<(), Box<dyn Error>>; fn start(&self, name: &str) -> Result<(), Box<dyn Error>>;
fn stop(name: &str) -> Result<(), Box<dyn Error>>; fn stop(&self, name: &str) -> Result<(), Box<dyn Error>>;
} }

View File

@ -0,0 +1,22 @@
use std::error::Error;
use std::path::PathBuf;
use crate::service::ServiceManager;
pub struct RunitServiceManager {}
impl ServiceManager for RunitServiceManager {
fn install(&self, bin_path: PathBuf, name: &str, server_url: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn uninstall(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn start(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn stop(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
}

View File

@ -1,22 +1,86 @@
use std::error::Error; use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command;
use log::{error, info};
use crate::service::codegen::ServiceFileGenerator;
use crate::service::codegen::systemd::SystemDServiceFileGenerator;
use crate::service::ServiceManager; use crate::service::ServiceManager;
pub struct SystemDServiceManager {} pub struct SystemDServiceManager {}
impl ServiceManager for SystemDServiceManager { impl ServiceManager for SystemDServiceManager {
fn install(bin_path: PathBuf, name: &str) -> Result<(), Box<dyn Error>> { fn install(&self, bin_path: PathBuf, name: &str, server_url: &str) -> Result<(), Box<dyn Error>> {
todo!() info!("Installing for SystemD");
SystemDServiceFileGenerator::create_service_files(bin_path, name, server_url)?;
info!("Enabling the SystemD service");
let out = Command::new("systemctl").args(["enable", &SystemDServiceFileGenerator::get_service_file_name(name)]).output()?;
if !out.status.success() {
error!("Error enabling the SystemD service (command exited with non-zero exit code)");
error!("stdout:");
error!("{}", String::from_utf8(out.stdout)?);
error!("stderr:");
error!("{}", String::from_utf8(out.stderr)?);
return Err("Command exited with non-zero exit code".into());
}
info!("Installation successful");
Ok(())
} }
fn uninstall(name: &str) -> Result<(), Box<dyn Error>> { fn uninstall(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!() info!("Uninstalling SystemD service files");
info!("Disabling the SystemD service");
let out = Command::new("systemctl").args(["disable", &SystemDServiceFileGenerator::get_service_file_name(name)]).output()?;
if !out.status.success() {
error!("Error disabling the SystemD service (command exited with non-zero exit code)");
error!("stdout:");
error!("{}", String::from_utf8(out.stdout)?);
error!("stderr:");
error!("{}", String::from_utf8(out.stderr)?);
return Err("Command exited with non-zero exit code".into());
}
info!("Removing the service files");
SystemDServiceFileGenerator::delete_service_files(name)?;
Ok(())
} }
fn start(name: &str) -> Result<(), Box<dyn Error>> { fn start(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!() info!("Starting the SystemD service");
let out = Command::new("systemctl").args(["start", &SystemDServiceFileGenerator::get_service_file_name(name)]).output()?;
if !out.status.success() {
error!("Error starting the SystemD service (command exited with non-zero exit code)");
error!("stdout:");
error!("{}", String::from_utf8(out.stdout)?);
error!("stderr:");
error!("{}", String::from_utf8(out.stderr)?);
return Err("Command exited with non-zero exit code".into());
}
Ok(())
} }
fn stop(name: &str) -> Result<(), Box<dyn Error>> { fn stop(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!() info!("Stopping the SystemD service");
let out = Command::new("systemctl").args(["stop", &SystemDServiceFileGenerator::get_service_file_name(name)]).output()?;
if !out.status.success() {
error!("Error stopping the SystemD service (command exited with non-zero exit code)");
error!("stdout:");
error!("{}", String::from_utf8(out.stdout)?);
error!("stderr:");
error!("{}", String::from_utf8(out.stderr)?);
return Err("Command exited with non-zero exit code".into());
}
Ok(())
} }
} }

View File

@ -0,0 +1,22 @@
use std::error::Error;
use std::path::PathBuf;
use crate::service::ServiceManager;
pub struct WindowsServiceManager {}
impl ServiceManager for WindowsServiceManager {
fn install(&self, bin_path: PathBuf, name: &str, server_url: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn uninstall(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn start(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
fn stop(&self, name: &str) -> Result<(), Box<dyn Error>> {
todo!()
}
}

View File

@ -0,0 +1,13 @@
// Code to handle the nebula worker
use std::sync::mpsc::Receiver;
use crate::config::TFClientConfig;
use crate::daemon::ThreadMessageSender;
pub enum SocketWorkerMessage {
}
pub fn socketworker_main(config: TFClientConfig, transmitter: ThreadMessageSender, rx: Receiver<SocketWorkerMessage>) {
}

View File

@ -1,9 +1,29 @@
use log::{error, warn};
use sha2::Sha256; use sha2::Sha256;
use sha2::Digest; use sha2::Digest;
use url::Url;
pub fn sha256(bytes: &[u8]) -> String { pub fn sha256(bytes: &[u8]) -> String {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(bytes); hasher.update(bytes);
let digest = hasher.finalize(); let digest = hasher.finalize();
hex::encode(digest) hex::encode(digest)
}
pub fn check_server_url(server: &str) {
let api_base = match Url::parse(&server) {
Ok(u) => u,
Err(e) => {
error!("Invalid server url `{}`: {}", server, e);
std::process::exit(1);
}
};
match api_base.scheme() {
"http" => { warn!("HTTP api urls are not reccomended. Please switch to HTTPS if possible.") },
"https" => (),
_ => {
error!("Unsupported protocol `{}` (expected one of http, https)", api_base.scheme());
std::process::exit(1);
}
}
} }