hell is over!

This commit is contained in:
core 2023-04-09 13:40:44 -04:00
parent dda1b9e50e
commit 5c97ea362e
Signed by: core
GPG Key ID: FDBF740DADDCEECF
11 changed files with 157 additions and 50 deletions

1
Cargo.lock generated
View File

@ -76,6 +76,7 @@ dependencies = [
"console_log", "console_log",
"futures", "futures",
"js-sys", "js-sys",
"lazy_static",
"log", "log",
"protocol", "protocol",
"rmp-serde", "rmp-serde",

View File

@ -21,3 +21,4 @@ protocol = { version = "0.1.0", path = "../protocol" }
rmp-serde = "1.1" rmp-serde = "1.1"
ws_stream_wasm = "0.7" ws_stream_wasm = "0.7"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
lazy_static = "1.4"

19
client/src/chat.rs Normal file
View File

@ -0,0 +1,19 @@
use wasm_bindgen::prelude::*;
use protocol::MessageC2S;
use crate::CLIENT;
use futures::SinkExt;
#[wasm_bindgen]
pub async fn send_chat(message: &str) -> Result<(), JsError> {
let client_data = &mut CLIENT.write()?.client_data;
if let Some(data) = client_data {
send!(data.tx, &MessageC2S::Chat {
message: message.to_string()
}).await?;
} else {
return Err(JsError::new("Client not yet connected to server"));
}
Ok(())
}

View File

@ -1,7 +1,7 @@
use std::error::Error; use std::error::Error;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt; use futures::StreamExt;
use log::{debug, error, info, Level, trace}; use log::{debug, error, info, Level, trace, warn};
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use ws_stream_wasm::{WsMessage, WsMeta, WsStream}; use ws_stream_wasm::{WsMessage, WsMeta, WsStream};
use protocol::State; use protocol::State;
@ -9,20 +9,20 @@ use protocol::PROTOCOL_VERSION;
use protocol::MessageS2C; use protocol::MessageS2C;
use protocol::MessageC2S; use protocol::MessageC2S;
use futures::SinkExt; use futures::SinkExt;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::sync::RwLock;
use futures::FutureExt;
#[macro_use] #[macro_use]
pub mod macros; pub mod macros;
pub mod chat;
#[wasm_bindgen] #[wasm_bindgen]
extern { extern {
pub fn alert(s: &str); pub fn alert(s: &str);
} }
#[wasm_bindgen]
pub fn send_chat(chat: &str) {
info!("sending chat: {}", chat);
}
#[wasm_bindgen] #[wasm_bindgen]
pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> { pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> {
console_log::init_with_level(Level::Debug).unwrap(); console_log::init_with_level(Level::Debug).unwrap();
@ -37,17 +37,27 @@ pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> {
} }
}; };
info!("Gateway client exited"); info!("Gateway client set up successfully");
Ok(()) Ok(())
} }
pub struct Client { pub struct Client {
pub client_data: Option<ClientData>,
}
pub struct ClientData {
pub state: State, pub state: State,
pub tx: SplitSink<WsStream, WsMessage>, pub tx: SplitSink<WsStream, WsMessage>,
pub rx: SplitStream<WsStream> pub rx: SplitStream<WsStream>
} }
lazy_static! {
pub static ref CLIENT: Arc<RwLock<Client>> = Arc::new(RwLock::new(Client {
client_data: None
}));
}
pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> { pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
info!("FAST CONNECT: {}", gateway); info!("FAST CONNECT: {}", gateway);
let gateway_url = url::Url::parse(gateway)?; let gateway_url = url::Url::parse(gateway)?;
@ -56,7 +66,7 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
trace!("Connected to gateway socket"); trace!("Connected to gateway socket");
let (tx, rx) = ws_stream.split(); let (tx, rx) = ws_stream.split();
let mut client = Client { let mut client_data = ClientData {
state: State::Handshake, state: State::Handshake,
tx, tx,
rx rx
@ -64,7 +74,7 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
trace!("Split stream, handshaking with server"); trace!("Split stream, handshaking with server");
send!(client.tx, &MessageC2S::Hello { send!(client_data.tx, &MessageC2S::Hello {
next_state: State::Play, next_state: State::Play,
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
requested_username: username.to_string() requested_username: username.to_string()
@ -72,22 +82,64 @@ pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
trace!("Sent handshake start packet"); trace!("Sent handshake start packet");
if let Some(msg) = recv_now!(client.rx)? { if let Some(msg) = recv_now!(client_data.rx)? {
let typed_msg: MessageS2C = msg; let typed_msg: MessageS2C = msg;
match typed_msg { match typed_msg {
MessageS2C::Hello { version, given_username, next_state } => { MessageS2C::Hello { version, given_username, next_state } => {
info!("FAST CONNECT - connected to server protocol {} given username {}, switching to state {:?}", version, given_username, next_state); info!("FAST CONNECT - connected to server protocol {} given username {}, switching to state {:?}", version, given_username, next_state);
client.state = next_state; client_data.state = next_state;
}, },
MessageS2C::Goodbye { reason } => { MessageS2C::Goodbye { reason } => {
error!("server disconnected before finishing handshake: {:?}", reason); error!("server disconnected before finishing handshake: {:?}", reason);
return Err(format!("disconnected by server: {:?}", reason).into()); return Err(format!("disconnected by server: {:?}", reason).into());
},
_ => {
warn!("received unexpected packet from server: {:?}", typed_msg);
} }
} }
} else { } else {
error!("Server closed the connection") error!("Server closed the connection")
} }
CLIENT.write()?.client_data = Some(client_data);
Ok(())
}
#[wasm_bindgen]
pub async fn update_socket() -> Result<(), JsError> {
let mut client = CLIENT.write()?;
if client.client_data.is_none() {
return Err(JsError::new("Client not yet initialized"));
}
let client_data = client.client_data.as_mut().unwrap();
let maybe_msg: Option<MessageS2C> = match recv!(client_data.rx) {
Ok(r) => r,
Err(e) => {
return Err(JsError::new(e))
}
};
if let Some(msg) = maybe_msg {
match msg {
MessageS2C::Goodbye { reason } => {
info!("server sent disconnect: {:?}", reason);
client.client_data = None;
return Err(JsError::new("disconnected by server"));
}
MessageS2C::Chat { from, message } => {
info!("[CHAT] {}: {}", from, message);
// TODO: Handle
},
_ => {
warn!("server sent unexpected packet {:?}, ignoring", msg);
}
}
}
Ok(()) Ok(())
} }

View File

@ -18,10 +18,8 @@ macro_rules! recv {
{ {
if let Some(future_result) = $reader.next().now_or_never() { if let Some(future_result) = $reader.next().now_or_never() {
if let Some(msg) = future_result { if let Some(msg) = future_result {
match msg { if let WsMessage::Binary(msg) = msg {
Ok(msg) => { match rmp_serde::from_slice(&msg) {
if msg.is_binary() {
match rmp_serde::from_slice(&msg.into_data()) {
Ok(d) => Ok(Some(d)), Ok(d) => Ok(Some(d)),
Err(e) => { Err(e) => {
log::error!("error deserializing message: {}", e); log::error!("error deserializing message: {}", e);
@ -31,12 +29,6 @@ macro_rules! recv {
} else { } else {
Ok(None) Ok(None)
} }
},
Err(e) => {
log::error!("error receiving message: {}", e);
Ok(None)
}
}
} else { } else {
log::error!("pipe closed"); log::error!("pipe closed");
Err("Pipe closed") Err("Pipe closed")

View File

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
pub const PROTOCOL_VERSION: u32 = 1; pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum State { pub enum State {
Handshake, Handshake,
Play Play
@ -18,6 +18,10 @@ pub enum MessageC2S {
Goodbye { Goodbye {
reason: GoodbyeReason reason: GoodbyeReason
},
Chat {
message: String
} }
} }
@ -31,6 +35,11 @@ pub enum MessageS2C {
Goodbye { Goodbye {
reason: GoodbyeReason reason: GoodbyeReason
},
Chat {
from: String,
message: String
} }
} }

View File

@ -13,19 +13,24 @@ use crate::{send, recv};
pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> { pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
let mut state = State::Handshake; let mut state = State::Handshake;
let mut username = String::new();
loop { loop {
if let Some(msg) = rx.recv().await { if let Some(msg) = rx.recv().await {
match msg { match msg {
ClientHandlerMessage::Tick => {} // this intentionally does nothing ClientHandlerMessage::Tick => {} // this intentionally does nothing,
ClientHandlerMessage::ChatMessage { from, message } => {
send!(client_tx, &MessageS2C::Chat {
message,
from
}).await?;
}
} }
} else { } else {
info!("channel closed, shutting down"); info!("channel closed, shutting down");
break; break;
} }
info!("here");
if let Some(pkt) = recv!(client_rx)? { if let Some(pkt) = recv!(client_rx)? {
match state { match state {
State::Handshake => { State::Handshake => {
@ -69,13 +74,22 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx:
send!(client_tx, &MessageS2C::Hello { send!(client_tx, &MessageS2C::Hello {
version, version,
given_username: requested_username, given_username: requested_username.clone(),
next_state, next_state,
}).await?; }).await?;
state = next_state;
username = requested_username;
}, },
MessageC2S::Goodbye { reason } => { MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason); info!("client sent goodbye: {:?}", reason);
break; break;
},
MessageC2S::Chat { .. } => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedPacket,
}).await?;
break;
} }
} }
} }
@ -87,10 +101,22 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx:
reason: GoodbyeReason::UnexpectedPacket, reason: GoodbyeReason::UnexpectedPacket,
}).await?; }).await?;
break; break;
} },
MessageC2S::Goodbye { reason } => { MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason); info!("client sent goodbye: {:?}", reason);
break; break;
},
MessageC2S::Chat { message } => {
info!("[{}] CHAT: [{}] {}", remote_addr, username, message);
for (addr, client_thread) in mgr.handlers.read().await.iter() {
match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await {
Ok(_) => (),
Err(e) => {
error!("unable to update a client thread: {}", e);
}
}
}
} }
} }
} }

View File

@ -20,5 +20,6 @@ pub struct ClientHandler {
} }
pub enum ClientHandlerMessage { pub enum ClientHandlerMessage {
Tick Tick,
ChatMessage { from: String, message: String }
} }

View File

@ -7,7 +7,7 @@ use tokio_tungstenite::WebSocketStream;
use tungstenite::{Error, handshake}; use tungstenite::{Error, handshake};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{error, info}; use log::{error, info, Level};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use protocol::State; use protocol::State;
use crate::handler::{ClientHandler, ClientManager}; use crate::handler::{ClientHandler, ClientManager};
@ -117,7 +117,7 @@ lazy_static! {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
simple_logger::init_with_env().expect("Unable to start logging service"); simple_logger::init_with_level(Level::Debug).expect("Unable to start logging service");
let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

View File

@ -6,14 +6,14 @@
</head> </head>
<body> <body>
<script type="module"> <form target="/play.html" method="GET">
// If you're getting build errors here | you need to run `just build_client_bundle` first, to compile client code <label for="server">Gateway server</label>
// v <input type="text" name="server" id="server" value="ws://localhost:3000/ws" required />
import init from "./dist/client.js"; <br>
<label for="username">Username</label>
init().then(() => { <input type="text" name="username" id="username" required />
// wasm-pack code here <br>
}) <button>Launch!</button>
</script> </form>
</body> </body>
</html> </html>

View File

@ -20,13 +20,19 @@
<script type="module"> <script type="module">
// If you're getting build errors here | you need to run `just build_client_bundle` first, to compile client code // If you're getting build errors here | you need to run `just build_client_bundle` first, to compile client code
// v // v
import init, { rust_init, send_chat } from "./dist/client.js"; import init, { rust_init, send_chat, update_socket } from "./dist/client.js";
init().then(() => { init().then(() => {
rust_init("ws://localhost:3000/ws", "core"); const urlSearchParams = new URLSearchParams(window.location.search);
rust_init(urlSearchParams.get("server"), urlSearchParams.get("username")).then(() => {
document.getElementById("chat-submit").addEventListener("click", e => { document.getElementById("chat-submit").addEventListener("click", e => {
send_chat(document.getElementById("chat-value").value); send_chat(document.getElementById("chat-value").value);
}) });
setInterval(() => {
update_socket();
}, 5);
});
}) })
</script> </script>
</body> </body>