From dda1b9e50e823af10d1d19ce27bf8a5296c84b9e Mon Sep 17 00:00:00 2001
From: core <core@coredoes.dev>
Date: Sun, 9 Apr 2023 12:58:44 -0400
Subject: [PATCH] hell is over?

---
 Cargo.lock                         | 71 +++++++++++++++++++++---
 client/Cargo.toml                  |  9 ++--
 client/src/lib.rs                  | 75 +++++++++++++++++++++-----
 {protocol => client}/src/macros.rs | 31 +++++++++--
 protocol/Cargo.toml                |  5 +-
 protocol/src/lib.rs                |  5 +-
 server/src/client_handler.rs       |  5 +-
 server/src/handler.rs              |  2 +-
 server/src/macros.rs               | 86 ++++++++++++++++++++++++++++++
 server/src/main.rs                 | 25 ++++++---
 server/src/timer.rs                |  4 +-
 web/play.html                      |  2 +-
 12 files changed, 272 insertions(+), 48 deletions(-)
 rename {protocol => client}/src/macros.rs (67%)
 create mode 100644 server/src/macros.rs

diff --git a/Cargo.lock b/Cargo.lock
index 22ce3cd..d85ab87 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2,6 +2,17 @@
 # It is not intended for manual editing.
 version = 3
 
+[[package]]
+name = "async_io_stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
+dependencies = [
+ "futures",
+ "pharos",
+ "rustc_version",
+]
+
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -66,13 +77,14 @@ dependencies = [
  "futures",
  "js-sys",
  "log",
- "tokio",
- "tokio-tungstenite",
- "tungstenite",
+ "protocol",
+ "rmp-serde",
+ "serde",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
  "web-sys",
+ "ws_stream_wasm",
 ]
 
 [[package]]
@@ -472,6 +484,16 @@ version = "2.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
 
+[[package]]
+name = "pharos"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
+dependencies = [
+ "futures",
+ "rustc_version",
+]
+
 [[package]]
 name = "pin-project-lite"
 version = "0.2.9"
@@ -503,11 +525,8 @@ dependencies = [
 name = "protocol"
 version = "0.1.0"
 dependencies = [
- "futures",
  "rmp-serde",
  "serde",
- "tokio-tungstenite",
- "tungstenite",
 ]
 
 [[package]]
@@ -571,6 +590,27 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
+[[package]]
+name = "semver"
+version = "1.0.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
+
+[[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+
 [[package]]
 name = "serde"
 version = "1.0.159"
@@ -1086,3 +1126,22 @@ name = "windows_x86_64_msvc"
 version = "0.42.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
+
+[[package]]
+name = "ws_stream_wasm"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5"
+dependencies = [
+ "async_io_stream",
+ "futures",
+ "js-sys",
+ "log",
+ "pharos",
+ "rustc_version",
+ "send_wrapper",
+ "thiserror",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
diff --git a/client/Cargo.toml b/client/Cargo.toml
index 194332d..b04aa0c 100644
--- a/client/Cargo.toml
+++ b/client/Cargo.toml
@@ -14,9 +14,10 @@ js-sys = "0.3"
 web-sys = { version = "0.3", features = ["CanvasRenderingContext2d", "Document", "Element", "HtmlCanvasElement", "Window"]}
 console_log = { version = "1", features = ["color"] }
 log = "0.4"
-tungstenite = { version = "0.18", default-features = false }
-tokio-tungstenite = { version = "0.18" }
-tokio = { version = "1.27", features = ["macros", "sync", "rt-multi-thread"] }
 futures = { version = "0.3", default-features = false }
 wasm-bindgen-futures = "0.4"
-url = "2.3"
\ No newline at end of file
+url = "2.3"
+protocol = { version = "0.1.0", path = "../protocol" }
+rmp-serde = "1.1"
+ws_stream_wasm = "0.7"
+serde = { version = "1", features = ["derive"] }
\ No newline at end of file
diff --git a/client/src/lib.rs b/client/src/lib.rs
index 7f504a1..bed4846 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -1,8 +1,17 @@
 use std::error::Error;
+use futures::stream::{SplitSink, SplitStream};
 use futures::StreamExt;
-use log::{debug, error, info, Level};
-use tokio_tungstenite::connect_async;
+use log::{debug, error, info, Level, trace};
 use wasm_bindgen::prelude::*;
+use ws_stream_wasm::{WsMessage, WsMeta, WsStream};
+use protocol::State;
+use protocol::PROTOCOL_VERSION;
+use protocol::MessageS2C;
+use protocol::MessageC2S;
+use futures::SinkExt;
+
+#[macro_use]
+pub mod macros;
 
 #[wasm_bindgen]
 extern {
@@ -15,30 +24,70 @@ pub fn send_chat(chat: &str) {
 }
 
 #[wasm_bindgen]
-pub async fn rust_init(gateway: &str, username: &str) {
+pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> {
     console_log::init_with_level(Level::Debug).unwrap();
 
     info!("Logger setup successfully");
 
-    match init(gateway, username).await {
-        Ok(_) => (),
+    match main(gateway, username).await {
+        Ok(c) => c,
         Err(e) => {
             error!("Error initializing gateway client: {}", e);
-            return;
+            return Err(JsError::new(&e.to_string()));
         }
-    }
+    };
 
-    info!("Gateway client initialized successfully");
+    info!("Gateway client exited");
+
+    Ok(())
 }
 
-pub async fn init(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
+pub struct Client {
+    pub state: State,
+    pub tx: SplitSink<WsStream, WsMessage>,
+    pub rx: SplitStream<WsStream>
+}
+
+pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
     info!("FAST CONNECT: {}", gateway);
     let gateway_url = url::Url::parse(gateway)?;
-    debug!("Gateway URL parsed");
-    let (ws_stream, _) = connect_async(gateway_url).await?;
-    debug!("Connected to gateway socket");
+    trace!("Gateway URL parsed");
+    let (_ws, ws_stream) = WsMeta::connect(gateway_url, None).await?;
+    trace!("Connected to gateway socket");
     let (tx, rx) = ws_stream.split();
-    debug!("Split stream, handshaking with server");
+
+    let mut client = Client {
+        state: State::Handshake,
+        tx,
+        rx
+    };
+
+    trace!("Split stream, handshaking with server");
+
+    send!(client.tx, &MessageC2S::Hello {
+        next_state: State::Play,
+        version: PROTOCOL_VERSION,
+        requested_username: username.to_string()
+    }).await?;
+
+    trace!("Sent handshake start packet");
+
+    if let Some(msg) = recv_now!(client.rx)? {
+        let typed_msg: MessageS2C = msg;
+
+        match typed_msg {
+            MessageS2C::Hello { version, given_username, next_state } => {
+                info!("FAST CONNECT - connected to server protocol {} given username {}, switching to state {:?}", version, given_username, next_state);
+                client.state = next_state;
+            },
+            MessageS2C::Goodbye { reason } => {
+                error!("server disconnected before finishing handshake: {:?}", reason);
+                return Err(format!("disconnected by server: {:?}", reason).into());
+            }
+        }
+    } else {
+        error!("Server closed the connection")
+    }
 
     Ok(())
 }
\ No newline at end of file
diff --git a/protocol/src/macros.rs b/client/src/macros.rs
similarity index 67%
rename from protocol/src/macros.rs
rename to client/src/macros.rs
index cd4f7cb..41ce9db 100644
--- a/protocol/src/macros.rs
+++ b/client/src/macros.rs
@@ -3,8 +3,7 @@ use std::io;
 use futures::{AsyncRead, AsyncWrite, FutureExt, Stream, StreamExt};
 use futures::stream::SplitStream;
 use serde::{Deserialize, Serialize};
-use tokio_tungstenite::WebSocketStream;
-use tungstenite::Message;
+use ws_stream_wasm::WsMessage;
 
 #[macro_export]
 macro_rules! send {
@@ -49,6 +48,30 @@ macro_rules! recv {
     }
 }
 
-pub fn __generic_packet_to_message<T: Serialize>(pkt: &T) -> Result<Message, rmp_serde::encode::Error> {
-    rmp_serde::to_vec(&pkt).map(Message::from)
+#[macro_export]
+macro_rules! recv_now {
+    ($reader:expr) => {
+        {
+            if let Some(msg) = $reader.next().await {
+                if let WsMessage::Binary(msg) = msg {
+                    match rmp_serde::from_slice(&msg) {
+                        Ok(d) => Ok(Some(d)),
+                        Err(e) => {
+                            log::error!("error deserializing message: {}", e);
+                            Ok(None)
+                        }
+                    }
+                } else {
+                    Ok(None)
+                }
+            } else {
+                log::error!("pipe closed");
+                Err("Pipe closed")
+            }
+        }
+    };
+}
+
+pub fn __generic_packet_to_message<T: Serialize>(pkt: &T) -> Result<WsMessage, rmp_serde::encode::Error> {
+    rmp_serde::to_vec(&pkt).map(WsMessage::from)
 }
\ No newline at end of file
diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml
index 8e1771d..bb9fd77 100644
--- a/protocol/Cargo.toml
+++ b/protocol/Cargo.toml
@@ -7,7 +7,4 @@ edition = "2021"
 
 [dependencies]
 serde = { version = "1", features = ["derive"] }
-rmp-serde = "1.1"
-tungstenite = { version = "0.18", default-features = false }
-tokio-tungstenite = { version = "0.18" }
-futures = "0.3"
\ No newline at end of file
+rmp-serde = "1.1"
\ No newline at end of file
diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs
index 34b3766..0fe3fcd 100644
--- a/protocol/src/lib.rs
+++ b/protocol/src/lib.rs
@@ -1,7 +1,4 @@
-use serde::{Serialize, Deserialize};
-
-#[macro_use]
-pub mod macros;
+use serde::{Deserialize, Serialize};
 
 pub const PROTOCOL_VERSION: u32 = 1;
 
diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs
index 51093fe..095982e 100644
--- a/server/src/client_handler.rs
+++ b/server/src/client_handler.rs
@@ -7,8 +7,9 @@ use log::{error, info};
 use tokio::sync::mpsc::Receiver;
 use tokio_tungstenite::WebSocketStream;
 use tungstenite::Message;
-use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, recv, send, State};
+use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, State};
 use crate::handler::{ClientHandlerMessage, ClientManager};
+use crate::{send, recv};
 
 pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
     let mut state = State::Handshake;
@@ -23,6 +24,8 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx:
             break;
         }
 
+        info!("here");
+
         if let Some(pkt) = recv!(client_rx)? {
             match state {
                 State::Handshake => {
diff --git a/server/src/handler.rs b/server/src/handler.rs
index 92b0418..a90a3c9 100644
--- a/server/src/handler.rs
+++ b/server/src/handler.rs
@@ -10,7 +10,7 @@ use protocol::State;
 
 #[derive(Clone)]
 pub struct ClientManager {
-    pub clients: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
+    pub handlers: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
     pub usernames: Arc<RwLock<HashMap<SocketAddr, String>>>
 }
 
diff --git a/server/src/macros.rs b/server/src/macros.rs
new file mode 100644
index 0000000..9f25629
--- /dev/null
+++ b/server/src/macros.rs
@@ -0,0 +1,86 @@
+use std::error::Error;
+use std::io;
+use futures::{AsyncRead, AsyncWrite, FutureExt, Stream, StreamExt};
+use futures::stream::SplitStream;
+use serde::{Deserialize, Serialize};
+use tokio_tungstenite::WebSocketStream;
+use tungstenite::Message;
+
+#[macro_export]
+macro_rules! send {
+    ($writer:expr,$pkt:expr) => {
+        $writer.send($crate::macros::__generic_packet_to_message($pkt).unwrap())
+    };
+}
+
+#[macro_export]
+macro_rules! recv {
+    ($reader:expr) => {
+        {
+            if let Some(future_result) = $reader.next().now_or_never() {
+                if let Some(msg) = future_result {
+                    match msg {
+                        Ok(msg) => {
+                            if msg.is_binary() {
+                                match rmp_serde::from_slice(&msg.into_data()) {
+                                    Ok(d) => Ok(Some(d)),
+                                    Err(e) => {
+                                        log::error!("error deserializing message: {}", e);
+                                        Ok(None)
+                                    }
+                                }
+                            } else {
+                                Ok(None)
+                            }
+                        },
+                        Err(e) => {
+                            log::error!("error receiving message: {}", e);
+                            Ok(None)
+                        }
+                    }
+                } else {
+                    log::error!("pipe closed");
+                    Err("Pipe closed")
+                }
+            } else {
+                Ok(None)
+            }
+        }
+    }
+}
+
+#[macro_export]
+macro_rules! recv_now {
+    ($reader:expr) => {
+        {
+            if let Some(msg) = $reader.next().await {
+                match msg {
+                    Ok(msg) => {
+                        if msg.is_binary() {
+                            match rmp_serde::from_slice(&msg.into_data()) {
+                                Ok(d) => Ok(Some(d)),
+                                Err(e) => {
+                                    log::error!("error deserializing message: {}", e);
+                                    Ok(None)
+                                }
+                            }
+                        } else {
+                            Ok(None)
+                        }
+                    },
+                    Err(e) => {
+                        log::error!("error receiving message: {}", e);
+                        Ok(None)
+                    }
+                }
+            } else {
+                log::error!("pipe closed");
+                Err("Pipe closed")
+            }
+        }
+    };
+}
+
+pub fn __generic_packet_to_message<T: Serialize>(pkt: &T) -> Result<Message, rmp_serde::encode::Error> {
+    rmp_serde::to_vec(&pkt).map(Message::from)
+}
\ No newline at end of file
diff --git a/server/src/main.rs b/server/src/main.rs
index d5cc3e1..8915f6a 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -1,10 +1,10 @@
 use std::convert::Infallible;
 use std::net::SocketAddr;
 use std::sync::Arc;
-use hyper::{header, upgrade, StatusCode, Body, Request, Response, Server, server::conn::AddrStream};
+use hyper::{Body, header, Request, Response, Server, server::conn::AddrStream, StatusCode, upgrade};
 use hyper::service::{make_service_fn, service_fn};
 use tokio_tungstenite::WebSocketStream;
-use tungstenite::{handshake, Error};
+use tungstenite::{Error, handshake};
 use futures::stream::StreamExt;
 use lazy_static::lazy_static;
 use log::{error, info};
@@ -12,15 +12,19 @@ use tokio::sync::RwLock;
 use protocol::State;
 use crate::handler::{ClientHandler, ClientManager};
 use crate::client_handler::handle_client;
+use crate::timer::timer_main;
 
 pub mod client_handler;
 pub mod handler;
 pub mod timer;
+#[macro_use]
+pub mod macros;
 
 async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr: ClientManager) -> Result<Response<Body>, Infallible> {
     match (request.uri().path(), request.headers().contains_key(header::UPGRADE)) {
         //if the request is ws_echo and the request headers contains an Upgrade key
         ("/ws", true) => {
+            info!("received connection from {}", remote_addr);
             //assume request is a handshake, so create the handshake response
             let response =
                 match handshake::server::create_response_with_body(&request, || Body::empty()) {
@@ -32,7 +36,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
                             match upgrade::on(&mut request).await {
                                 //if successfully upgraded
                                 Ok(upgraded) => {
-
+                                    info!("[{}] connection upgraded", remote_addr);
                                     //create a websocket stream from the upgraded object
                                     let ws_stream = WebSocketStream::from_raw_socket(
                                         //pass the upgraded object
@@ -53,9 +57,11 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
 
                                     // Acquire the write lock in a small scope, so it's dropped as quickly as possible
                                     {
-                                        mgr.clients.write().await.insert(remote_addr, client);
+                                        mgr.handlers.write().await.insert(remote_addr, client);
                                     }
 
+                                    info!("[{}] passing to client handler", remote_addr);
+
                                     //forward the stream to the sink to achieve echo
                                     match handle_client(mgr.clone(), remote_addr, rx, ws_write, ws_read).await {
                                         Ok(_) => {},
@@ -64,7 +70,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
 
                                     // clean up values left over
                                     {
-                                        mgr.clients.write().await.remove(&remote_addr);
+                                        mgr.handlers.write().await.remove(&remote_addr);
                                         mgr.usernames.write().await.remove(&remote_addr);
                                     }
                                 },
@@ -104,7 +110,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
 
 lazy_static! {
     static ref cmgr: ClientManager = ClientManager {
-        clients: Arc::new(RwLock::new(Default::default())),
+        handlers: Arc::new(RwLock::new(Default::default())),
         usernames: Arc::new(RwLock::new(Default::default())),
     };
 }
@@ -129,11 +135,14 @@ async fn main() {
         }
     });
 
-
+    let mgr_timer = cmgr.clone();
+    let timer_thread = tokio::spawn(async move {
+        timer_main(mgr_timer).await;
+    });
 
     let server = Server::bind(&addr).serve(make_svc);
 
     if let Err(e) = server.await {
         error!("error in server thread: {}", e);
     }
-}
\ No newline at end of file
+}
diff --git a/server/src/timer.rs b/server/src/timer.rs
index 301313b..8e45af9 100644
--- a/server/src/timer.rs
+++ b/server/src/timer.rs
@@ -1,6 +1,6 @@
 use std::error::Error;
 use std::time::Duration;
-use log::error;
+use log::{error, trace};
 use tokio::sync::mpsc::Receiver;
 use tokio::time::sleep;
 use crate::handler::{ClientHandlerMessage, ClientManager};
@@ -9,7 +9,7 @@ pub async fn timer_main(mgr: ClientManager) {
     loop {
         sleep(Duration::from_millis(5)).await;
 
-        for (addr, client_thread) in mgr.clients.read().await.iter() {
+        for (addr, client_thread) in mgr.handlers.read().await.iter() {
             match client_thread.tx.send(ClientHandlerMessage::Tick).await {
                 Ok(_) => (),
                 Err(e) => {
diff --git a/web/play.html b/web/play.html
index 052e3cf..b3a44c9 100644
--- a/web/play.html
+++ b/web/play.html
@@ -22,7 +22,7 @@
             //                                     v
             import init, { rust_init, send_chat } from "./dist/client.js";
             init().then(() => {
-                rust_init();
+                rust_init("ws://localhost:3000/ws", "core");
 
                 document.getElementById("chat-submit").addEventListener("click", e => {
                     send_chat(document.getElementById("chat-value").value);