This commit is contained in:
core 2023-07-19 23:15:06 -04:00
commit 420e85a64d
Signed by: core
GPG Key ID: FDBF740DADDCEECF
9 changed files with 329 additions and 0 deletions

6
Cargo.toml Normal file
View File

@ -0,0 +1,6 @@
[workspace]
members = [
"api",
"common",
"websocket-worker"
]

13
api/config.toml Normal file
View File

@ -0,0 +1,13 @@
[server]
bind = "0.0.0.0"
port = 8171
base_url = "http://localhost:8171/"
[auth]
tokens = [
{ "token" = "test-ro-token", can_write = false },
{ "token" = "test-rw-token", can_write = true }
]
[log]
log_file = "chat_log.log"

34
api/src/config.rs Normal file
View File

@ -0,0 +1,34 @@
use std::net::IpAddr;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Config {
pub server: ServerConfig,
pub auth: AuthConfig,
pub log: LogConfig
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServerConfig {
pub bind: IpAddr,
pub port: u16,
pub base_url: Url
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AuthConfig {
pub tokens: Vec<Token>
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Token {
pub token: String,
pub can_write: bool
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LogConfig {
pub log_file: String
}

199
api/src/gateway.rs Normal file
View File

@ -0,0 +1,199 @@
use std::fs::File;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use actix_web::{Error, HttpRequest, HttpResponse, web};
use actix_web::web::Data;
use actix_web_actors::ws;
use actix_web_actors::ws::{CloseCode, CloseReason, Message, ProtocolError};
use log::{debug, error, info, warn};
use common::message::{DisconnectReason, GatewayChatMessage, GatewayPacketC2S, GatewayPacketS2C};
use crate::config::{Config, Token};
use crate::gateway::WsState::Handshaking;
struct GatewayInterthreadMessage(GatewayChatMessage);
pub enum WsState {
Handshaking,
Relay
}
pub struct WsClient {
remote_addr: SocketAddr,
addr: Addr<WsActor>
}
pub struct WsActor {
config: Config,
clients: Arc<RwLock<Vec<WsClient>>>,
remote_addr: SocketAddr,
state: WsState,
write_perms: bool,
log: Arc<RwLock<File>>
}
impl Actor for WsActor {
type Context = ws::WebsocketContext<Self>;
}
impl actix::Message for GatewayInterthreadMessage {
type Result = Result<(), ()>;
}
impl Handler<GatewayInterthreadMessage> for WsActor {
type Result = Result<(), ()>;
fn handle(&mut self, msg: GatewayInterthreadMessage, ctx: &mut Self::Context) -> Self::Result {
info!("received message from another thread, relaying it");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Relayed {
msg: msg.0
}).unwrap());
Ok(())
}
}
// {"Authenticate":{"token":"test-rw-token","request_write_perms":true}}
// {"Relay":{"msg":{"source":"Discord","username":"@realcore","message":"ello","timestamp":"2023-07-20T01:58:36+00:00"}}}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
fn handle(&mut self, msg: Result<Message, ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
// attempt to decode the packet
let packet: GatewayPacketC2S = match serde_json::from_str(&text) {
Ok(p) => p,
Err(e) => {
error!("packet decode error: {}", e);
info!("disconnecting client for an illegal packet");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect {
reason: DisconnectReason::IllegalPacket
}).unwrap());
ctx.close(Some(CloseReason::from(CloseCode::Abnormal)));
ctx.stop();
return;
}
};
match self.state {
Handshaking => {
match packet {
GatewayPacketC2S::Authenticate { token, request_write_perms } => {
info!("client authenticating with {}", token);
if !self.config.auth.tokens.iter().any(|u| u.token == token) {
warn!("client authenticated with invalid token {}", token);
info!("disconnecting client for failed authentication");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect {
reason: DisconnectReason::AuthenticationRejected
}).unwrap());
ctx.close(Some(CloseReason::from(CloseCode::Abnormal)));
ctx.stop();
}
let can_write = self.config.auth.tokens.iter().find(|u| u.token == token).unwrap().can_write;
if request_write_perms && !can_write {
warn!("client requested write perms on token {} that is read-only, ignoring", token);
}
self.write_perms = request_write_perms && can_write;
self.state = WsState::Relay;
ctx.text(serde_json::to_string(&GatewayPacketS2C::AuthenticationAccepted {
write_perms: self.write_perms,
}).unwrap());
}
_ => {
error!("recieved non-Authenticate packet during Handshaking stage");
info!("disconnecting client for an illegal packet");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect {
reason: DisconnectReason::IllegalPacket
}).unwrap());
ctx.close(Some(CloseReason::from(CloseCode::Abnormal)));
ctx.stop();
}
}
}
WsState::Relay => {
match packet {
GatewayPacketC2S::Relay { msg } => {
info!("relaying message {:?}", msg);
{
let mut buf = serde_json::to_vec(&msg).unwrap();
buf.push(b'\n');
self.log.write().unwrap().write_all(&buf).unwrap();
}
{
for client in self.clients.read().unwrap().iter() {
if client.remote_addr == self.remote_addr { continue; }
client.addr.try_send(GatewayInterthreadMessage(msg.clone())).unwrap();
}
}
}
GatewayPacketC2S::Disconnect { .. } => {
info!("client disconnecting");
ctx.close(Some(CloseReason::from(CloseCode::Normal)));
ctx.stop();
},
_ => {
error!("received non-Relay/Disconnect packet during Handshaking stage");
info!("disconnecting client for an illegal packet");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect {
reason: DisconnectReason::IllegalPacket
}).unwrap());
ctx.close(Some(CloseReason::from(CloseCode::Abnormal)));
ctx.stop();
}
}
}
}
},
Ok(ws::Message::Close(_)) => {
ctx.stop();
},
_ => {
warn!("recv unknown websocket type: {:?}", msg);
info!("disconnecting client for an illegal packet");
ctx.text(serde_json::to_string(&GatewayPacketS2C::Disconnect {
reason: DisconnectReason::IllegalPacket
}).unwrap());
ctx.close(Some(CloseReason::from(CloseCode::Abnormal)));
ctx.stop();
}
}
}
fn started(&mut self, ctx: &mut Self::Context) {
self.clients.write().unwrap().push(WsClient {
remote_addr: self.remote_addr,
addr: ctx.address(),
})
}
fn finished(&mut self, ctx: &mut Self::Context) {
let pos = { self.clients.write().unwrap().iter().position(|u| u.remote_addr == self.remote_addr) }.unwrap();
self.clients.write().unwrap().remove(pos);
}
}
pub async fn gateway(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let config: &Data<Config> = req.app_data().unwrap();
let clients: &Data<Arc<RwLock<Vec<WsClient>>>> = req.app_data().unwrap();
let log: &Data<Arc<RwLock<File>>> = req.app_data().unwrap();
let resp = ws::start(WsActor {
config: config.as_ref().clone(),
clients: clients.get_ref().clone(),
remote_addr: req.peer_addr().unwrap(),
state: Handshaking,
write_perms: false,
log: log.as_ref().clone()
}, &req, stream);
debug!("{:?}", resp);
resp
}

13
api/src/status.rs Normal file
View File

@ -0,0 +1,13 @@
use actix_web::get;
use actix_web::web::{Data, Json};
use common::status::{DATA_API_VERSION, Status};
use crate::config::Config;
#[get("/status")]
pub async fn status_endpoint(config: Data<Config>) -> Json<Status> {
Json(Status {
data_api_version: DATA_API_VERSION,
gateway_url: config.server.base_url.join("/gateway").unwrap(),
status_url: config.server.base_url.join("/status").unwrap()
})
}

37
common/src/message.rs Normal file
View File

@ -0,0 +1,37 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GatewayChatMessage {
pub source: GatewayChatSource,
pub username: String,
pub message: String,
pub timestamp: DateTime<Utc>
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum GatewayChatSource {
Discord,
Minecraft
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum GatewayPacketC2S {
Authenticate { token: String, request_write_perms: bool },
Relay { msg: GatewayChatMessage },
Disconnect { reason: DisconnectReason }
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum GatewayPacketS2C {
AuthenticationAccepted { write_perms: bool },
Disconnect { reason: DisconnectReason },
Relayed { msg: GatewayChatMessage }
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum DisconnectReason {
AuthenticationRejected,
IllegalPacket,
AllDone
}

11
common/src/status.rs Normal file
View File

@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
use url::Url;
pub const DATA_API_VERSION: u64 = 1;
#[derive(Serialize, Deserialize, Debug)]
pub struct Status {
pub data_api_version: u64,
pub status_url: Url,
pub gateway_url: Url
}

View File

@ -0,0 +1,6 @@
websockets = [
"https://discord.com/api/webhooks/1130340476797583380/0RBp48jK4x3qjKYcmypjw4ydm3GoK2r_D-yzz95b5cnBHtq8lsFx66kJnmniIVAe8H4u"
]
api_status_url = "http://localhost:8171/status"
workers = 8
token = "test-ro-token"

View File

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Config {
pub websockets: Vec<String>,
pub api_status_url: Url,
pub workers: usize,
pub token: String
}