use std::error::Error; use std::{fs, thread}; use std::net::SocketAddr; use std::sync::Arc; use std::time::SystemTime; use async_tungstenite::tokio::{connect_async, ConnectStream}; use async_tungstenite::tungstenite::Message; use async_tungstenite::WebSocketStream; use azalea_client::{Account, Client, DefaultPlugins, Event, start_ecs}; use azalea_client::packet_handling::DeathEvent; use azalea_client::respawn::{perform_respawn, PerformRespawnEvent}; use azalea_protocol::packets::game::ClientboundGamePacket; use azalea_protocol::packets::game::serverbound_client_information_packet::{ChatVisibility, ServerboundClientInformationPacket}; use azalea_protocol::ServerAddress; use bevy_app::{App, Plugin, Update}; use bevy_ecs::event::{EventReader, EventWriter}; use bevy_ecs::prelude::IntoSystemConfigs; use chrono::DateTime; use futures::{SinkExt, Stream, StreamExt}; use log::{debug, error, info, Level}; use regex::Regex; use tokio::select; use tokio::sync::broadcast::channel; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_threadpool::ThreadPool; use url::Url; use common::message::{GatewayChatMessage, GatewayChatSource, GatewayPacketC2S, GatewayPacketS2C}; use common::status::{DATA_API_VERSION, Status}; use crate::bacon::handle_message; use crate::config::Config; pub mod bacon; pub mod config; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { simple_logger::init_with_level(Level::Info).unwrap(); info!("Loading config"); let mut args = std::env::args(); if args.len() != 2 { eprintln!("usage: ./azalea-worker <config>"); std::process::exit(1); } let file = args.nth(1).unwrap(); let config_str = fs::read_to_string(&file)?; let config: Config = toml::from_str(&config_str)?; info!("Config loaded from {}", file); info!("Loading status from the API ({})...", config.api_status_url); let status: Status = reqwest::get(config.api_status_url.clone()).await?.json().await?; debug!("{:?}", status); if status.data_api_version != DATA_API_VERSION { error!("Data API is incompatible. This version of the websocket worker was compiled with Data API v{}, but your Data API is Data API v{}", DATA_API_VERSION, status.data_api_version); std::process::exit(1); } info!("Connecting to gateway uri {}", status.gateway_url.to_string()); let mut gateway_uri = status.gateway_url; if gateway_uri.scheme() == "http" { gateway_uri.set_scheme("ws").unwrap(); } else if gateway_uri.scheme() == "https" { gateway_uri.set_scheme("wss").unwrap(); } let (mut ws_stream, _) = connect_async(&gateway_uri).await?; debug!("websocket connection opened - sending Authenticate"); ws_stream.send(Message::text(serde_json::to_string(&GatewayPacketC2S::Authenticate { token: config.token.clone(), request_write_perms: true, // we *need* write perms for the relay })?)).await?; debug!("starting handshake loop (waiting for AuthenticationAccepted)"); loop { let msg = ws_stream.next().await; match msg { Some(msg) => match msg? { Message::Text(packet) => { debug!("recv gateway packet {}", packet); let packet: GatewayPacketS2C = serde_json::from_str(&packet)?; match packet { GatewayPacketS2C::AuthenticationAccepted { write_perms } => { info!("Gateway accepted our token."); if !write_perms { error!("Gateway server did not grant us write perms to the chat stream socket."); error!("This is REQUIRED for azalea-worker to function correctly."); error!("Check to ensure your token has `allow-write = true` set in the API config file."); error!("AuthenticationAccepted with write_perms:false"); std::process::exit(1); } break; } GatewayPacketS2C::Disconnect { reason } => { error!("Disconnected by server: {:?}", reason); std::process::exit(1); } GatewayPacketS2C::Relayed { .. } => {} // ignore messages for now } }, msg => { debug!("unknown message {:?}", msg); } }, None => { debug!("didn't receive anything"); continue; } } } info!("Connected to gateway, starting the minecraft client"); info!("Connecting to the Minecraft server..."); let account = Account::microsoft(&config.server.microsoft_email).await?; let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); let mut app = App::new(); app.add_plugins((DefaultPlugins, AutoRespawnPlugin)); let ecs_lock = start_ecs(app, run_schedule_receiver, run_schedule_sender.clone()); let (mut client, mut rx) = Client::start_client( ecs_lock, &account, &ServerAddress::from(SocketAddr::new(config.server.ip.into(), config.server.port)), &SocketAddr::new(config.server.ip.into(), config.server.port), run_schedule_sender ).await?; client.chat("Hello!"); client.set_client_information(ServerboundClientInformationPacket { language: "en_US".to_string(), view_distance: 16, chat_visibility: ChatVisibility::Full, chat_colors: true, model_customization: Default::default(), main_hand: Default::default(), text_filtering_enabled: false, allows_listing: true, }).await?; loop { select! { msg = ws_stream.next() => handle_ws_message(msg, &mut client).await?, e = rx.recv() => { if let Some(e) = e { handle_packet(e, &config, &mut client, &mut ws_stream).await? } } } } /* for msg in client.incoming_messages() { match msg { Ok(msg) => { match msg { OwnedMessage::Text(txt) => { // decode debug!("{}", txt); let packet: GatewayPacketS2C = serde_json::from_str(&txt)?; match packet { GatewayPacketS2C::AuthenticationAccepted { .. } => { info!("auth accepted by server"); } GatewayPacketS2C::Disconnect { reason } => { info!("disconnected by server: {:?}", reason); return Err("Disconnected by server")?; } GatewayPacketS2C::Relayed { msg } => { info!("msg: {:?}", msg); } } } OwnedMessage::Close(_) => { info!("closing connection"); return Ok(()); } _ => { debug!("ignoring unknown type"); } } } Err(e) => { error!("rx error: {}", e); return Err(e.into()); } } } */ Ok(()) } #[derive(Clone, Default)] pub struct AutoRespawnPlugin; impl Plugin for AutoRespawnPlugin { fn build(&self, app: &mut App) { app.add_systems(Update, auto_respawn.before(perform_respawn)); } } fn auto_respawn( mut events: EventReader<DeathEvent>, mut perform_respawn_events: EventWriter<PerformRespawnEvent>, ) { for event in events.iter() { perform_respawn_events.send(PerformRespawnEvent { entity: event.entity, }); } } async fn handle_ws_message(msg: Option<Result<Message, async_tungstenite::tungstenite::Error>>, client: &mut Client) -> Result<(), Box<dyn Error>> { match msg { Some(msg) => match msg? { Message::Text(packet) => { debug!("recv gateway packet {}", packet); let packet: GatewayPacketS2C = serde_json::from_str(&packet)?; match packet { GatewayPacketS2C::AuthenticationAccepted { .. } => { Ok(()) }, GatewayPacketS2C::Disconnect { reason } => { error!("Disconnected by server: {:?}", reason); std::process::exit(1); } GatewayPacketS2C::Relayed { msg } => { if matches!(msg.source, GatewayChatSource::Minecraft) { return Ok(()); } client.chat(&format!("{}: {}", msg.username, msg.message)); Ok(()) } } }, msg => { debug!("unknown message {:?}", msg); Ok(()) } }, None => { debug!("didn't receive anything"); Ok(()) } } } async fn handle_packet(e: Event, config: &Config, client: &mut Client, ws_stream: &mut WebSocketStream<ConnectStream>) -> Result<(), Box<dyn Error>> { match e { Event::Init => {} Event::Login => {} Event::Chat(pkt) => { if pkt.content().starts_with("/skill") { return Ok(()); } info!("<{} ({})> {}", pkt.username().unwrap_or("SYSTEM".to_string()), pkt.uuid().map(|u| u.to_string()).unwrap_or("SYSTEM".to_string()), pkt.message()); let re = Regex::new(r"@[a-z0-9_-]+:").unwrap(); if !(re.is_match(&pkt.content()) && pkt.uuid() == Some(config.server.uuid)) { info!("sending message"); ws_stream.send(Message::Text(serde_json::to_string(&GatewayPacketC2S::Relay { msg: GatewayChatMessage { source: GatewayChatSource::Minecraft, username: pkt.username().unwrap_or("SYSTEM".to_string()), message: pkt.content(), timestamp: DateTime::from(SystemTime::now()), }})?)).await?; } handle_message(pkt.clone(), client, config).await; /* if pkt.content() == "!disconnect" { if let Some(u) = pkt.uuid() { if !config.read().await.permissions.admin.uuid_members.contains(&u) { client.chat("Nice try. You need to be an 'admin' to do that."); continue; } } else { client.chat("Nice try. You need to be an 'admin' to do that."); continue; } client.chat("Okay, disconnecting. Cya!"); client.disconnect(); return Err("Disconnected.".into()); } if pkt.content() == "!reconnect" { if let Some(u) = pkt.uuid() { if !config.read().await.permissions.admin.uuid_members.contains(&u) { client.chat("Nice try. You need to be an 'admin' to do that."); continue; } } else { client.chat("Nice try. You need to be an 'admin' to do that."); continue; } client.chat("Okay, reconnecting. Cya!"); client.disconnect(); break; } if pkt.content() == "!help" { client.chat("Hi, I'm CuberCore! I am owned by CoreCuber/@realcore."); client.chat("I function as the chat relay for this server and also do other useful things."); client.chat("My commands:"); client.chat("!sleep - Sleep in the nearest bed"); client.chat("!disconnect - Disconnect from the server (admin only)"); client.chat("!reconnect - Reconnect to the server (admin only)"); client.chat("!location - List all locations I know about"); client.chat("!location <location> - Get the coords of a location"); } if pkt.content() == "!bot" { client.chat("[iambot]"); } if pkt.content().starts_with("!location") { let content = pkt.content(); let split = content.split(' ').collect::<Vec<_>>(); if split.len() == 1 { client.chat(&format!("[bot response] Locations: {:?}", config.read().await.locations.keys().collect::<Vec<_>>())); } else { let location = split[1]; if let Some(coords) = config.read().await.locations.get(location) { client.chat(&format!("[bot response] Location '{}' is at {}", location, coords)); } else { client.chat(&format!("[bot response] I don't know where '{}' is.", location)); } } } */ } Event::Tick => {} Event::Packet(p) => { match p.as_ref() { ClientboundGamePacket::Disconnect(_) => { info!("Disconnected, restarting"); return Err("Disconnected".into()); } _ => () } } Event::AddPlayer(_) => {} Event::RemovePlayer(_) => {} Event::UpdatePlayer(_) => {} Event::Death(e) => {} Event::KeepAlive(_) => {} }; Ok(()) }