From 420e85a64d4e25aa84f67f691e6c12800ef0c846 Mon Sep 17 00:00:00 2001 From: core Date: Wed, 19 Jul 2023 23:15:06 -0400 Subject: [PATCH] grable --- Cargo.toml | 6 + api/config.toml | 13 +++ api/src/config.rs | 34 ++++++ api/src/gateway.rs | 199 +++++++++++++++++++++++++++++++++ api/src/status.rs | 13 +++ common/src/message.rs | 37 ++++++ common/src/status.rs | 11 ++ websocket-worker/config.toml | 6 + websocket-worker/src/config.rs | 10 ++ 9 files changed, 329 insertions(+) create mode 100644 Cargo.toml create mode 100644 api/config.toml create mode 100644 api/src/config.rs create mode 100644 api/src/gateway.rs create mode 100644 api/src/status.rs create mode 100644 common/src/message.rs create mode 100644 common/src/status.rs create mode 100644 websocket-worker/config.toml create mode 100644 websocket-worker/src/config.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d4b0580 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,6 @@ +[workspace] +members = [ + "api", + "common", + "websocket-worker" +] \ No newline at end of file diff --git a/api/config.toml b/api/config.toml new file mode 100644 index 0000000..92207ad --- /dev/null +++ b/api/config.toml @@ -0,0 +1,13 @@ +[server] +bind = "0.0.0.0" +port = 8171 +base_url = "http://localhost:8171/" + +[auth] +tokens = [ + { "token" = "test-ro-token", can_write = false }, + { "token" = "test-rw-token", can_write = true } +] + +[log] +log_file = "chat_log.log" \ No newline at end of file diff --git a/api/src/config.rs b/api/src/config.rs new file mode 100644 index 0000000..1d542bf --- /dev/null +++ b/api/src/config.rs @@ -0,0 +1,34 @@ +use std::net::IpAddr; +use serde::{Deserialize, Serialize}; +use url::Url; + + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Config { + pub server: ServerConfig, + pub auth: AuthConfig, + pub log: LogConfig +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ServerConfig { + pub bind: IpAddr, + pub port: u16, + pub base_url: Url +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AuthConfig { + pub tokens: Vec +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Token { + pub token: String, + pub can_write: bool +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LogConfig { + pub log_file: String +} \ No newline at end of file diff --git a/api/src/gateway.rs b/api/src/gateway.rs new file mode 100644 index 0000000..252a8d8 --- /dev/null +++ b/api/src/gateway.rs @@ -0,0 +1,199 @@ +use std::fs::File; +use std::io::Write; +use std::net::SocketAddr; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex, RwLock}; +use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; +use actix_web::{Error, HttpRequest, HttpResponse, web}; +use actix_web::web::Data; +use actix_web_actors::ws; +use actix_web_actors::ws::{CloseCode, CloseReason, Message, ProtocolError}; +use log::{debug, error, info, warn}; +use common::message::{DisconnectReason, GatewayChatMessage, GatewayPacketC2S, GatewayPacketS2C}; +use crate::config::{Config, Token}; +use crate::gateway::WsState::Handshaking; + +struct GatewayInterthreadMessage(GatewayChatMessage); + +pub enum WsState { + Handshaking, + Relay +} + +pub struct WsClient { + remote_addr: SocketAddr, + addr: Addr +} + +pub struct WsActor { + config: Config, + clients: Arc>>, + remote_addr: SocketAddr, + state: WsState, + write_perms: bool, + log: Arc> +} + +impl Actor for WsActor { + type Context = ws::WebsocketContext; +} + +impl actix::Message for GatewayInterthreadMessage { + type Result = Result<(), ()>; +} + +impl Handler for WsActor { + type Result = Result<(), ()>; + + fn handle(&mut self, msg: GatewayInterthreadMessage, ctx: &mut Self::Context) -> Self::Result { + info!("received message from another thread, relaying it"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Relayed { + msg: msg.0 + }).unwrap()); + Ok(()) + } +} + +// {"Authenticate":{"token":"test-rw-token","request_write_perms":true}} +// {"Relay":{"msg":{"source":"Discord","username":"@realcore","message":"ello","timestamp":"2023-07-20T01:58:36+00:00"}}} + +impl StreamHandler> for WsActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => { + // attempt to decode the packet + let packet: GatewayPacketC2S = match serde_json::from_str(&text) { + Ok(p) => p, + Err(e) => { + error!("packet decode error: {}", e); + info!("disconnecting client for an illegal packet"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect { + reason: DisconnectReason::IllegalPacket + }).unwrap()); + ctx.close(Some(CloseReason::from(CloseCode::Abnormal))); + ctx.stop(); + return; + } + }; + + match self.state { + Handshaking => { + match packet { + GatewayPacketC2S::Authenticate { token, request_write_perms } => { + info!("client authenticating with {}", token); + + if !self.config.auth.tokens.iter().any(|u| u.token == token) { + warn!("client authenticated with invalid token {}", token); + info!("disconnecting client for failed authentication"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect { + reason: DisconnectReason::AuthenticationRejected + }).unwrap()); + ctx.close(Some(CloseReason::from(CloseCode::Abnormal))); + ctx.stop(); + } + + let can_write = self.config.auth.tokens.iter().find(|u| u.token == token).unwrap().can_write; + + if request_write_perms && !can_write { + warn!("client requested write perms on token {} that is read-only, ignoring", token); + } + + self.write_perms = request_write_perms && can_write; + self.state = WsState::Relay; + + ctx.text(serde_json::to_string(&GatewayPacketS2C::AuthenticationAccepted { + write_perms: self.write_perms, + }).unwrap()); + } + _ => { + error!("recieved non-Authenticate packet during Handshaking stage"); + info!("disconnecting client for an illegal packet"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect { + reason: DisconnectReason::IllegalPacket + }).unwrap()); + ctx.close(Some(CloseReason::from(CloseCode::Abnormal))); + ctx.stop(); + } + } + } + WsState::Relay => { + match packet { + GatewayPacketC2S::Relay { msg } => { + info!("relaying message {:?}", msg); + { + let mut buf = serde_json::to_vec(&msg).unwrap(); + buf.push(b'\n'); + self.log.write().unwrap().write_all(&buf).unwrap(); + } + { + for client in self.clients.read().unwrap().iter() { + if client.remote_addr == self.remote_addr { continue; } + client.addr.try_send(GatewayInterthreadMessage(msg.clone())).unwrap(); + } + } + } + GatewayPacketC2S::Disconnect { .. } => { + info!("client disconnecting"); + ctx.close(Some(CloseReason::from(CloseCode::Normal))); + ctx.stop(); + }, + _ => { + error!("received non-Relay/Disconnect packet during Handshaking stage"); + info!("disconnecting client for an illegal packet"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect { + reason: DisconnectReason::IllegalPacket + }).unwrap()); + ctx.close(Some(CloseReason::from(CloseCode::Abnormal))); + ctx.stop(); + } + } + } + } + }, + Ok(ws::Message::Close(_)) => { + ctx.stop(); + }, + _ => { + warn!("recv unknown websocket type: {:?}", msg); + info!("disconnecting client for an illegal packet"); + ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect { + reason: DisconnectReason::IllegalPacket + }).unwrap()); + ctx.close(Some(CloseReason::from(CloseCode::Abnormal))); + ctx.stop(); + } + } + } + + fn started(&mut self, ctx: &mut Self::Context) { + self.clients.write().unwrap().push(WsClient { + remote_addr: self.remote_addr, + addr: ctx.address(), + }) + } + + fn finished(&mut self, ctx: &mut Self::Context) { + let pos = { self.clients.write().unwrap().iter().position(|u| u.remote_addr == self.remote_addr) }.unwrap(); + self.clients.write().unwrap().remove(pos); + } +} + +pub async fn gateway(req: HttpRequest, stream: web::Payload) -> Result { + let config: &Data = req.app_data().unwrap(); + let clients: &Data>>> = req.app_data().unwrap(); + let log: &Data>> = req.app_data().unwrap(); + + let resp = ws::start(WsActor { + config: config.as_ref().clone(), + clients: clients.get_ref().clone(), + remote_addr: req.peer_addr().unwrap(), + state: Handshaking, + write_perms: false, + log: log.as_ref().clone() + }, &req, stream); + + debug!("{:?}", resp); + + resp +} \ No newline at end of file diff --git a/api/src/status.rs b/api/src/status.rs new file mode 100644 index 0000000..ac80a72 --- /dev/null +++ b/api/src/status.rs @@ -0,0 +1,13 @@ +use actix_web::get; +use actix_web::web::{Data, Json}; +use common::status::{DATA_API_VERSION, Status}; +use crate::config::Config; + +#[get("/status")] +pub async fn status_endpoint(config: Data) -> Json { + Json(Status { + data_api_version: DATA_API_VERSION, + gateway_url: config.server.base_url.join("/gateway").unwrap(), + status_url: config.server.base_url.join("/status").unwrap() + }) +} \ No newline at end of file diff --git a/common/src/message.rs b/common/src/message.rs new file mode 100644 index 0000000..a4d5403 --- /dev/null +++ b/common/src/message.rs @@ -0,0 +1,37 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GatewayChatMessage { + pub source: GatewayChatSource, + pub username: String, + pub message: String, + pub timestamp: DateTime +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum GatewayChatSource { + Discord, + Minecraft +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum GatewayPacketC2S { + Authenticate { token: String, request_write_perms: bool }, + Relay { msg: GatewayChatMessage }, + Disconnect { reason: DisconnectReason } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum GatewayPacketS2C { + AuthenticationAccepted { write_perms: bool }, + Disconnect { reason: DisconnectReason }, + Relayed { msg: GatewayChatMessage } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum DisconnectReason { + AuthenticationRejected, + IllegalPacket, + AllDone +} \ No newline at end of file diff --git a/common/src/status.rs b/common/src/status.rs new file mode 100644 index 0000000..ac84701 --- /dev/null +++ b/common/src/status.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; +use url::Url; + +pub const DATA_API_VERSION: u64 = 1; + +#[derive(Serialize, Deserialize, Debug)] +pub struct Status { + pub data_api_version: u64, + pub status_url: Url, + pub gateway_url: Url +} \ No newline at end of file diff --git a/websocket-worker/config.toml b/websocket-worker/config.toml new file mode 100644 index 0000000..97f246b --- /dev/null +++ b/websocket-worker/config.toml @@ -0,0 +1,6 @@ +websockets = [ + "https://discord.com/api/webhooks/1130340476797583380/0RBp48jK4x3qjKYcmypjw4ydm3GoK2r_D-yzz95b5cnBHtq8lsFx66kJnmniIVAe8H4u" +] +api_status_url = "http://localhost:8171/status" +workers = 8 +token = "test-ro-token" \ No newline at end of file diff --git a/websocket-worker/src/config.rs b/websocket-worker/src/config.rs new file mode 100644 index 0000000..12bdaeb --- /dev/null +++ b/websocket-worker/src/config.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Config { + pub websockets: Vec, + pub api_status_url: Url, + pub workers: usize, + pub token: String +} \ No newline at end of file