diff --git a/Cargo.lock b/Cargo.lock index d85ab87..7a0b9ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,7 @@ dependencies = [ "console_log", "futures", "js-sys", + "lazy_static", "log", "protocol", "rmp-serde", diff --git a/client/Cargo.toml b/client/Cargo.toml index b04aa0c..e5b99e2 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -20,4 +20,5 @@ url = "2.3" protocol = { version = "0.1.0", path = "../protocol" } rmp-serde = "1.1" ws_stream_wasm = "0.7" -serde = { version = "1", features = ["derive"] } \ No newline at end of file +serde = { version = "1", features = ["derive"] } +lazy_static = "1.4" \ No newline at end of file diff --git a/client/src/chat.rs b/client/src/chat.rs new file mode 100644 index 0000000..0bf0dfc --- /dev/null +++ b/client/src/chat.rs @@ -0,0 +1,19 @@ +use wasm_bindgen::prelude::*; +use protocol::MessageC2S; +use crate::CLIENT; +use futures::SinkExt; + +#[wasm_bindgen] +pub async fn send_chat(message: &str) -> Result<(), JsError> { + let client_data = &mut CLIENT.write()?.client_data; + + if let Some(data) = client_data { + send!(data.tx, &MessageC2S::Chat { + message: message.to_string() + }).await?; + } else { + return Err(JsError::new("Client not yet connected to server")); + } + + Ok(()) +} \ No newline at end of file diff --git a/client/src/lib.rs b/client/src/lib.rs index bed4846..ec59ab2 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,7 +1,7 @@ use std::error::Error; use futures::stream::{SplitSink, SplitStream}; use futures::StreamExt; -use log::{debug, error, info, Level, trace}; +use log::{debug, error, info, Level, trace, warn}; use wasm_bindgen::prelude::*; use ws_stream_wasm::{WsMessage, WsMeta, WsStream}; use protocol::State; @@ -9,20 +9,20 @@ use protocol::PROTOCOL_VERSION; use protocol::MessageS2C; use protocol::MessageC2S; use futures::SinkExt; +use lazy_static::lazy_static; +use std::sync::Arc; +use std::sync::RwLock; +use futures::FutureExt; #[macro_use] pub mod macros; +pub mod chat; #[wasm_bindgen] extern { pub fn alert(s: &str); } -#[wasm_bindgen] -pub fn send_chat(chat: &str) { - info!("sending chat: {}", chat); -} - #[wasm_bindgen] pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> { console_log::init_with_level(Level::Debug).unwrap(); @@ -37,17 +37,27 @@ pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> { } }; - info!("Gateway client exited"); + info!("Gateway client set up successfully"); Ok(()) } pub struct Client { + pub client_data: Option, +} + +pub struct ClientData { pub state: State, pub tx: SplitSink, pub rx: SplitStream } +lazy_static! { + pub static ref CLIENT: Arc> = Arc::new(RwLock::new(Client { + client_data: None + })); +} + pub async fn main(gateway: &str, username: &str) -> Result<(), Box> { info!("FAST CONNECT: {}", gateway); let gateway_url = url::Url::parse(gateway)?; @@ -56,7 +66,7 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box> { trace!("Connected to gateway socket"); let (tx, rx) = ws_stream.split(); - let mut client = Client { + let mut client_data = ClientData { state: State::Handshake, tx, rx @@ -64,7 +74,7 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box> { trace!("Split stream, handshaking with server"); - send!(client.tx, &MessageC2S::Hello { + send!(client_data.tx, &MessageC2S::Hello { next_state: State::Play, version: PROTOCOL_VERSION, requested_username: username.to_string() @@ -72,22 +82,64 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box> { trace!("Sent handshake start packet"); - if let Some(msg) = recv_now!(client.rx)? { + if let Some(msg) = recv_now!(client_data.rx)? { let typed_msg: MessageS2C = msg; match typed_msg { MessageS2C::Hello { version, given_username, next_state } => { info!("FAST CONNECT - connected to server protocol {} given username {}, switching to state {:?}", version, given_username, next_state); - client.state = next_state; + client_data.state = next_state; }, MessageS2C::Goodbye { reason } => { error!("server disconnected before finishing handshake: {:?}", reason); return Err(format!("disconnected by server: {:?}", reason).into()); + }, + _ => { + warn!("received unexpected packet from server: {:?}", typed_msg); } } } else { error!("Server closed the connection") } + CLIENT.write()?.client_data = Some(client_data); + + Ok(()) +} + +#[wasm_bindgen] +pub async fn update_socket() -> Result<(), JsError> { + let mut client = CLIENT.write()?; + + if client.client_data.is_none() { + return Err(JsError::new("Client not yet initialized")); + } + + let client_data = client.client_data.as_mut().unwrap(); + + let maybe_msg: Option = match recv!(client_data.rx) { + Ok(r) => r, + Err(e) => { + return Err(JsError::new(e)) + } + }; + + if let Some(msg) = maybe_msg { + match msg { + MessageS2C::Goodbye { reason } => { + info!("server sent disconnect: {:?}", reason); + client.client_data = None; + return Err(JsError::new("disconnected by server")); + } + MessageS2C::Chat { from, message } => { + info!("[CHAT] {}: {}", from, message); + // TODO: Handle + }, + _ => { + warn!("server sent unexpected packet {:?}, ignoring", msg); + } + } + } + Ok(()) } \ No newline at end of file diff --git a/client/src/macros.rs b/client/src/macros.rs index 41ce9db..10fecc8 100644 --- a/client/src/macros.rs +++ b/client/src/macros.rs @@ -18,24 +18,16 @@ macro_rules! recv { { if let Some(future_result) = $reader.next().now_or_never() { if let Some(msg) = future_result { - match msg { - Ok(msg) => { - if msg.is_binary() { - match rmp_serde::from_slice(&msg.into_data()) { - Ok(d) => Ok(Some(d)), - Err(e) => { - log::error!("error deserializing message: {}", e); - Ok(None) - } - } - } else { + if let WsMessage::Binary(msg) = msg { + match rmp_serde::from_slice(&msg) { + Ok(d) => Ok(Some(d)), + Err(e) => { + log::error!("error deserializing message: {}", e); Ok(None) } - }, - Err(e) => { - log::error!("error receiving message: {}", e); - Ok(None) } + } else { + Ok(None) } } else { log::error!("pipe closed"); diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 0fe3fcd..c21e066 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; pub const PROTOCOL_VERSION: u32 = 1; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum State { Handshake, Play @@ -18,6 +18,10 @@ pub enum MessageC2S { Goodbye { reason: GoodbyeReason + }, + + Chat { + message: String } } @@ -31,6 +35,11 @@ pub enum MessageS2C { Goodbye { reason: GoodbyeReason + }, + + Chat { + from: String, + message: String } } diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 095982e..7d15034 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -13,19 +13,24 @@ use crate::{send, recv}; pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; + let mut username = String::new(); loop { if let Some(msg) = rx.recv().await { match msg { - ClientHandlerMessage::Tick => {} // this intentionally does nothing + ClientHandlerMessage::Tick => {} // this intentionally does nothing, + ClientHandlerMessage::ChatMessage { from, message } => { + send!(client_tx, &MessageS2C::Chat { + message, + from + }).await?; + } } } else { info!("channel closed, shutting down"); break; } - info!("here"); - if let Some(pkt) = recv!(client_rx)? { match state { State::Handshake => { @@ -69,13 +74,22 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: send!(client_tx, &MessageS2C::Hello { version, - given_username: requested_username, + given_username: requested_username.clone(), next_state, }).await?; + state = next_state; + username = requested_username; }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; + }, + MessageC2S::Chat { .. } => { + error!("client sent unexpected packet {:?} for state {:?}", pkt, state); + send!(client_tx, &MessageS2C::Goodbye { + reason: GoodbyeReason::UnexpectedPacket, + }).await?; + break; } } } @@ -87,10 +101,22 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: reason: GoodbyeReason::UnexpectedPacket, }).await?; break; - } + }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; + }, + MessageC2S::Chat { message } => { + info!("[{}] CHAT: [{}] {}", remote_addr, username, message); + + for (addr, client_thread) in mgr.handlers.read().await.iter() { + match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await { + Ok(_) => (), + Err(e) => { + error!("unable to update a client thread: {}", e); + } + } + } } } } diff --git a/server/src/handler.rs b/server/src/handler.rs index a90a3c9..b98dd16 100644 --- a/server/src/handler.rs +++ b/server/src/handler.rs @@ -20,5 +20,6 @@ pub struct ClientHandler { } pub enum ClientHandlerMessage { - Tick + Tick, + ChatMessage { from: String, message: String } } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 8915f6a..e48032c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,7 +7,7 @@ use tokio_tungstenite::WebSocketStream; use tungstenite::{Error, handshake}; use futures::stream::StreamExt; use lazy_static::lazy_static; -use log::{error, info}; +use log::{error, info, Level}; use tokio::sync::RwLock; use protocol::State; use crate::handler::{ClientHandler, ClientManager}; @@ -117,7 +117,7 @@ lazy_static! { #[tokio::main] async fn main() { - simple_logger::init_with_env().expect("Unable to start logging service"); + simple_logger::init_with_level(Level::Debug).expect("Unable to start logging service"); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); diff --git a/web/index.html b/web/index.html index c2aebc2..dc51831 100644 --- a/web/index.html +++ b/web/index.html @@ -6,14 +6,14 @@ - +
+ + +
+ + +
+ +
\ No newline at end of file diff --git a/web/play.html b/web/play.html index b3a44c9..9a35007 100644 --- a/web/play.html +++ b/web/play.html @@ -20,13 +20,19 @@