From 9060a0fa0befd06174f514bdcefdaf8797b3beac Mon Sep 17 00:00:00 2001 From: core Date: Sun, 9 Apr 2023 11:25:42 -0400 Subject: [PATCH] hell pt3 --- Cargo.lock | 67 ++++++++++++++++++++++ client/Cargo.toml | 10 +++- client/src/lib.rs | 34 ++++++++++- protocol/Cargo.toml | 5 +- protocol/src/lib.rs | 3 + protocol/src/macros.rs | 54 ++++++++++++++++++ server/src/client_handler.rs | 99 ++++++++++++++++++++++++++++++++ server/src/handler.rs | 3 + server/src/main.rs | 5 +- server/src/wsserver.rs | 106 ----------------------------------- web/index.html | 4 +- web/play.html | 8 ++- 12 files changed, 282 insertions(+), 116 deletions(-) create mode 100644 protocol/src/macros.rs create mode 100644 server/src/client_handler.rs delete mode 100644 server/src/wsserver.rs diff --git a/Cargo.lock b/Cargo.lock index cda70f7..22ce3cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,8 +62,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "client" version = "0.1.0" dependencies = [ + "console_log", + "futures", "js-sys", + "log", + "tokio", + "tokio-tungstenite", + "tungstenite", + "url", "wasm-bindgen", + "wasm-bindgen-futures", "web-sys", ] @@ -78,6 +86,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "console_log" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8aed40e4edbf4d3b4431ab260b63fdc40f5780a4766824329ea0f1eefe3c0f" +dependencies = [ + "log", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "cpufeatures" version = "0.2.6" @@ -130,6 +149,7 @@ checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -152,12 +172,34 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.13", +] + [[package]] name = "futures-sink" version = "0.3.28" @@ -176,9 +218,13 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -362,6 +408,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + [[package]] name = "mio" version = "0.8.6" @@ -451,8 +503,11 @@ dependencies = [ name = "protocol" version = "0.1.0" dependencies = [ + "futures", "rmp-serde", "serde", + "tokio-tungstenite", + "tungstenite", ] [[package]] @@ -878,6 +933,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.84" diff --git a/client/Cargo.toml b/client/Cargo.toml index 54d3e1a..194332d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -11,4 +11,12 @@ crate-type = ["cdylib"] [dependencies] wasm-bindgen = "0.2" js-sys = "0.3" -web-sys = { version = "0.3", features = ["CanvasRenderingContext2d", "Document", "Element", "HtmlCanvasElement", "Window"]} \ No newline at end of file +web-sys = { version = "0.3", features = ["CanvasRenderingContext2d", "Document", "Element", "HtmlCanvasElement", "Window"]} +console_log = { version = "1", features = ["color"] } +log = "0.4" +tungstenite = { version = "0.18", default-features = false } +tokio-tungstenite = { version = "0.18" } +tokio = { version = "1.27", features = ["macros", "sync", "rt-multi-thread"] } +futures = { version = "0.3", default-features = false } +wasm-bindgen-futures = "0.4" +url = "2.3" \ No newline at end of file diff --git a/client/src/lib.rs b/client/src/lib.rs index 71e5564..7f504a1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,3 +1,7 @@ +use std::error::Error; +use futures::StreamExt; +use log::{debug, error, info, Level}; +use tokio_tungstenite::connect_async; use wasm_bindgen::prelude::*; #[wasm_bindgen] @@ -7,6 +11,34 @@ extern { #[wasm_bindgen] pub fn send_chat(chat: &str) { - println!("sending chat: {}", chat); + info!("sending chat: {}", chat); } +#[wasm_bindgen] +pub async fn rust_init(gateway: &str, username: &str) { + console_log::init_with_level(Level::Debug).unwrap(); + + info!("Logger setup successfully"); + + match init(gateway, username).await { + Ok(_) => (), + Err(e) => { + error!("Error initializing gateway client: {}", e); + return; + } + } + + info!("Gateway client initialized successfully"); +} + +pub async fn init(gateway: &str, username: &str) -> Result<(), Box> { + info!("FAST CONNECT: {}", gateway); + let gateway_url = url::Url::parse(gateway)?; + debug!("Gateway URL parsed"); + let (ws_stream, _) = connect_async(gateway_url).await?; + debug!("Connected to gateway socket"); + let (tx, rx) = ws_stream.split(); + debug!("Split stream, handshaking with server"); + + Ok(()) +} \ No newline at end of file diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index bb9fd77..8e1771d 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -7,4 +7,7 @@ edition = "2021" [dependencies] serde = { version = "1", features = ["derive"] } -rmp-serde = "1.1" \ No newline at end of file +rmp-serde = "1.1" +tungstenite = { version = "0.18", default-features = false } +tokio-tungstenite = { version = "0.18" } +futures = "0.3" \ No newline at end of file diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 6caf186..34b3766 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,5 +1,8 @@ use serde::{Serialize, Deserialize}; +#[macro_use] +pub mod macros; + pub const PROTOCOL_VERSION: u32 = 1; #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/protocol/src/macros.rs b/protocol/src/macros.rs new file mode 100644 index 0000000..cd4f7cb --- /dev/null +++ b/protocol/src/macros.rs @@ -0,0 +1,54 @@ +use std::error::Error; +use std::io; +use futures::{AsyncRead, AsyncWrite, FutureExt, Stream, StreamExt}; +use futures::stream::SplitStream; +use serde::{Deserialize, Serialize}; +use tokio_tungstenite::WebSocketStream; +use tungstenite::Message; + +#[macro_export] +macro_rules! send { + ($writer:expr,$pkt:expr) => { + $writer.send($crate::macros::__generic_packet_to_message($pkt).unwrap()) + }; +} + +#[macro_export] +macro_rules! recv { + ($reader:expr) => { + { + if let Some(future_result) = $reader.next().now_or_never() { + if let Some(msg) = future_result { + match msg { + Ok(msg) => { + if msg.is_binary() { + match rmp_serde::from_slice(&msg.into_data()) { + Ok(d) => Ok(Some(d)), + Err(e) => { + log::error!("error deserializing message: {}", e); + Ok(None) + } + } + } else { + Ok(None) + } + }, + Err(e) => { + log::error!("error receiving message: {}", e); + Ok(None) + } + } + } else { + log::error!("pipe closed"); + Err("Pipe closed") + } + } else { + Ok(None) + } + } + } +} + +pub fn __generic_packet_to_message(pkt: &T) -> Result { + rmp_serde::to_vec(&pkt).map(Message::from) +} \ No newline at end of file diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs new file mode 100644 index 0000000..51093fe --- /dev/null +++ b/server/src/client_handler.rs @@ -0,0 +1,99 @@ +use std::error::Error; +use std::net::SocketAddr; +use futures::stream::{SplitSink, SplitStream}; +use futures::{FutureExt, SinkExt, StreamExt}; +use hyper::upgrade::Upgraded; +use log::{error, info}; +use tokio::sync::mpsc::Receiver; +use tokio_tungstenite::WebSocketStream; +use tungstenite::Message; +use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, recv, send, State}; +use crate::handler::{ClientHandlerMessage, ClientManager}; + +pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { + let mut state = State::Handshake; + + loop { + if let Some(msg) = rx.recv().await { + match msg { + ClientHandlerMessage::Tick => {} // this intentionally does nothing + } + } else { + info!("channel closed, shutting down"); + break; + } + + if let Some(pkt) = recv!(client_rx)? { + match state { + State::Handshake => { + match pkt { + MessageC2S::Hello { version, requested_username, next_state } => { + if !matches!(next_state, State::Play) { + error!("client sent unexpected state {:?} (expected: Play)", next_state); + send!(client_tx, &MessageS2C::Goodbye { + reason: GoodbyeReason::UnexpectedNextState, + }).await?; + break; + } + + // check version + if version != PROTOCOL_VERSION { + error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION); + send!(client_tx, &MessageS2C::Goodbye { + reason: GoodbyeReason::UnsupportedProtocol { + supported: PROTOCOL_VERSION, + got: version, + }, + }).await?; + break; + } + + // determine if we can give them that username + { + if mgr.usernames.read().await.values().any(|u| *u == requested_username) { + error!("client requested username {} but it is in use", requested_username); + send!(client_tx, &MessageS2C::Goodbye { + reason: GoodbyeReason::UsernameTaken, + }).await?; + break; + } + } + + // username is fine + { + mgr.usernames.write().await.insert(remote_addr, requested_username.clone()); + } + + send!(client_tx, &MessageS2C::Hello { + version, + given_username: requested_username, + next_state, + }).await?; + }, + MessageC2S::Goodbye { reason } => { + info!("client sent goodbye: {:?}", reason); + break; + } + } + } + State::Play => { + match pkt { + MessageC2S::Hello { .. } => { + error!("client sent unexpected packet {:?} for state {:?}", pkt, state); + send!(client_tx, &MessageS2C::Goodbye { + reason: GoodbyeReason::UnexpectedPacket, + }).await?; + break; + } + MessageC2S::Goodbye { reason } => { + info!("client sent goodbye: {:?}", reason); + break; + } + } + } + } + } + } + + Ok(()) +} \ No newline at end of file diff --git a/server/src/handler.rs b/server/src/handler.rs index 7fd75df..92b0418 100644 --- a/server/src/handler.rs +++ b/server/src/handler.rs @@ -1,8 +1,11 @@ use std::collections::HashMap; +use std::error::Error; use std::net::SocketAddr; use std::sync::Arc; +use serde::Serialize; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; +use tungstenite::Message; use protocol::State; #[derive(Clone)] diff --git a/server/src/main.rs b/server/src/main.rs index e0b73dc..d5cc3e1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -11,9 +11,9 @@ use log::{error, info}; use tokio::sync::RwLock; use protocol::State; use crate::handler::{ClientHandler, ClientManager}; -use crate::wsserver::handle_client; +use crate::client_handler::handle_client; -pub mod wsserver; +pub mod client_handler; pub mod handler; pub mod timer; @@ -32,6 +32,7 @@ async fn handle_request(mut request: Request, remote_addr: SocketAddr, mgr match upgrade::on(&mut request).await { //if successfully upgraded Ok(upgraded) => { + //create a websocket stream from the upgraded object let ws_stream = WebSocketStream::from_raw_socket( //pass the upgraded object diff --git a/server/src/wsserver.rs b/server/src/wsserver.rs deleted file mode 100644 index 2985df7..0000000 --- a/server/src/wsserver.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::error::Error; -use std::net::SocketAddr; -use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; -use hyper::upgrade::Upgraded; -use log::{error, info}; -use tokio::sync::mpsc::Receiver; -use tokio_tungstenite::WebSocketStream; -use tungstenite::Message; -use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, State}; -use crate::handler::{ClientHandler, ClientHandlerMessage, ClientManager}; - -pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut write: SplitSink, Message>, mut read: SplitStream>) -> Result<(), Box> { - let mut state = State::Handshake; - - loop { - if let Some(msg) = rx.recv().await { - match msg { - ClientHandlerMessage::Tick => {} // this intentionally does nothing - } - } else { - info!("channel closed, shutting down"); - break; - } - - if let Some(msg) = read.next().await { - let msg = msg?; - - if msg.is_binary() { - // try to deserialize the msg - let pkt: MessageC2S = rmp_serde::from_slice(&msg.into_data())?; - - match state { - State::Handshake => { - match pkt { - MessageC2S::Hello { version, requested_username, next_state } => { - if !matches!(next_state, State::Play) { - error!("client sent unexpected state {:?} (expected: Play)", next_state); - write.send(Message::from(ps2c(&MessageS2C::Goodbye { - reason: GoodbyeReason::UnexpectedNextState, - }))).await?; - break; - } - - // check version - if version != PROTOCOL_VERSION { - error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION); - write.send(Message::from(ps2c(&MessageS2C::Goodbye { - reason: GoodbyeReason::UnsupportedProtocol { - supported: PROTOCOL_VERSION, - got: version, - }, - }))).await?; - break; - } - - // determine if we can give them that username - { - if mgr.usernames.read().await.values().into_iter().any(|u| *u == requested_username) { - error!("client requested username {} but it is in use", requested_username); - write.send(Message::from(ps2c(&MessageS2C::Goodbye { - reason: GoodbyeReason::UsernameTaken, - }))).await?; - break; - } - } - - // username is fine - { - mgr.usernames.write().await.insert(remote_addr, requested_username.clone()); - } - - write.send(Message::from(ps2c(&MessageS2C::Hello { - version, - given_username: requested_username, - next_state, - }))).await?; - }, - MessageC2S::Goodbye { reason } => { - info!("client sent goodbye: {:?}", reason); - break; - } - } - } - State::Play => { - match pkt { - MessageC2S::Hello { .. } => { - error!("client sent unexpected packet {:?} for state {:?}", pkt, state); - write.send(Message::from(ps2c(&MessageS2C::Goodbye { - reason: GoodbyeReason::UnexpectedPacket, - }))).await?; - break; - } - MessageC2S::Goodbye { reason } => { - info!("client sent goodbye: {:?}", reason); - break; - } - } - } - } - } - } - } - - Ok(()) -} \ No newline at end of file diff --git a/web/index.html b/web/index.html index 65e5e57..c2aebc2 100644 --- a/web/index.html +++ b/web/index.html @@ -9,10 +9,10 @@ diff --git a/web/play.html b/web/play.html index b84c57e..052e3cf 100644 --- a/web/play.html +++ b/web/play.html @@ -13,17 +13,19 @@

hello: blsdkjf

- +