From 47e47345cca2f8d7b3eb87968deb0ba9c5015c3f Mon Sep 17 00:00:00 2001 From: Shagnor Date: Fri, 16 Dec 2022 20:48:24 +1100 Subject: [PATCH] Implement echo test --- .gitignore | 2 + Cargo.lock | 30 +++++++++++ blastmud_game/Cargo.toml | 1 + blastmud_game/src/listener.rs | 80 ++++++++++++++++++++-------- blastmud_game/src/main.rs | 6 ++- blastmud_game/src/message_handler.rs | 35 ++++++++++-- blastmud_game/src/version_cutover.rs | 39 ++++++++++++++ blastmud_listener/src/main.rs | 32 +++++------ 8 files changed, 184 insertions(+), 41 deletions(-) create mode 100644 blastmud_game/src/version_cutover.rs diff --git a/.gitignore b/.gitignore index dc734822..61a04d21 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target config +docs/private + diff --git a/Cargo.lock b/Cargo.lock index cfe491b9..8fb57f76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,7 @@ dependencies = [ "deadpool-postgres", "futures", "log", + "nix", "serde", "serde_yaml", "simple_logger", @@ -425,6 +426,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mio" version = "0.8.5" @@ -437,6 +447,20 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nix" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset", + "pin-utils", + "static_assertions", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -804,6 +828,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.2" diff --git a/blastmud_game/Cargo.toml b/blastmud_game/Cargo.toml index 85b9a39e..19f56d4f 100644 --- a/blastmud_game/Cargo.toml +++ b/blastmud_game/Cargo.toml @@ -10,6 +10,7 @@ blastmud_interfaces = { path = "../blastmud_interfaces" } deadpool-postgres = { version = "0.10.3", features = ["serde"] } futures = "0.3.25" log = "0.4.17" +nix = "0.26.1" serde = { version = "1.0.150", features = ["derive", "serde_derive"] } serde_yaml = "0.9.14" simple_logger = "4.0.0" diff --git a/blastmud_game/src/listener.rs b/blastmud_game/src/listener.rs index 9efc754c..55f6f207 100644 --- a/blastmud_game/src/listener.rs +++ b/blastmud_game/src/listener.rs @@ -1,6 +1,6 @@ use std::error::Error; use tokio::task; -use tokio::net::{TcpStream, TcpListener}; +use tokio::net::{TcpSocket, TcpStream, lookup_host}; use log::{info, warn}; use tokio_util::codec; use tokio_util::codec::length_delimited::LengthDelimitedCodec; @@ -8,13 +8,15 @@ use tokio_serde::formats::Cbor; use blastmud_interfaces::*; use futures::prelude::*; use tokio::sync::{Mutex, mpsc, oneshot}; +use std::net::SocketAddr; use std::sync::Arc; use uuid::Uuid; use std::collections::BTreeMap; +#[derive(Debug)] pub struct ListenerSend { - message: MessageToListener, - ack_notify: oneshot::Sender<()> + pub message: MessageToListener, + pub ack_notify: oneshot::Sender<()> } pub type ListenerMap = Arc>>>; @@ -23,15 +25,24 @@ async fn handle_from_listener( message_handler: FHandler, listener_map: ListenerMap) where - FHandler: Fn(MessageFromListener) -> HandlerFut + Send + 'static, - HandlerFut: Future + Send + 'static { + FHandler: Fn(Uuid, MessageFromListener) -> HandlerFut + Send + 'static, + HandlerFut: Future>> + Send + 'static { let mut conn_framed = tokio_serde::Framed::new( codec::Framed::new(conn, LengthDelimitedCodec::new()), Cbor::::default() ); - let session = match conn_framed.try_next().await { - Ok(Some(MessageFromListener::ListenerPing { uuid })) => uuid, + let listener_id = match conn_framed.try_next().await { + Ok(Some(ref msg@MessageFromListener::ListenerPing { uuid })) => { + let handle_fut = message_handler(uuid.clone(), msg.clone()); + match handle_fut.await { + Ok(_) => {} + Err(e) => { + warn!("Error processing initial ListenerPing: {}", e); + } + }; + uuid + }, Ok(Some(msg)) => { warn!("Got non-ping first message from listener: {:?}", msg); return; @@ -55,7 +66,7 @@ where } let (sender, mut receiver) = mpsc::channel(1); - listener_map.lock().await.insert(session, sender); + listener_map.lock().await.insert(listener_id, sender); 'listener_loop: loop { tokio::select!( @@ -65,8 +76,17 @@ where warn!("Unexpected acknowledge from listener - bug in listener?"); } Ok(Some(msg)) => { - let handle_fut = message_handler(msg); - handle_fut.await; + let handle_fut = message_handler(listener_id, msg); + match handle_fut.await { + Ok(_) => {} + Err(e) => { + // On the assumption errors that get here are bad enough that they are a + // problem with the system rather than the message, so we want to log and + // retry later. + warn!("Error from message handler - closing listener connection: {}", e); + break 'listener_loop; + } + } match conn_framed.send( MessageToListener::AcknowledgeMessage ).await { @@ -79,12 +99,12 @@ where } Ok(None) => { warn!("Lost connection to listener {} due to end-of-stream", - session); + listener_id); break 'listener_loop; } Err(e) => { warn!("Lost connection to listener {} due to error {}", - session, e); + listener_id, e); break 'listener_loop; } } @@ -105,8 +125,18 @@ where break 'ack_wait_loop; } Ok(Some(msg)) => { - let handle_fut = message_handler(msg); - handle_fut.await; + let handle_fut = message_handler(listener_id, msg); + match handle_fut.await { + Ok(_) => {} + Err(e) => { + // On the assumption errors that get here are bad enough that they are a + // problem with the system rather than the message, so we want to log and + // retry later. + warn!("Error from message handler - closing listener connection: {}", e); + break 'listener_loop; + } + } + match conn_framed.send( MessageToListener::AcknowledgeMessage ).await { @@ -119,12 +149,12 @@ where } Ok(None) => { warn!("Lost connection to listener {} due to end-of-stream", - session); + listener_id); break 'listener_loop; } Err(e) => { warn!("Lost connection to listener {} due to error {}", - session, e); + listener_id, e); break 'listener_loop; } } @@ -133,7 +163,7 @@ where ); } - listener_map.lock().await.remove(&session); + listener_map.lock().await.remove(&listener_id); } pub fn make_listener_map() -> ListenerMap { @@ -146,12 +176,20 @@ pub async fn start_listener( handle_message: FHandler ) -> Result<(), Box> where - FHandler: Fn(MessageFromListener) -> HandlerFut + Send + Clone + 'static, - HandlerFut: Future + Send + 'static + FHandler: Fn(Uuid, MessageFromListener) -> HandlerFut + Send + Clone + 'static, + HandlerFut: Future>> + Send + 'static { info!("Starting listener on {}", bind_to); - let listener = TcpListener::bind(bind_to).await?; - + let addr = lookup_host(bind_to).await?.next().expect("listener address didn't resolve"); + let socket = match addr { + SocketAddr::V4 {..} => TcpSocket::new_v4()?, + SocketAddr::V6 {..} => TcpSocket::new_v6()? + }; + socket.set_reuseaddr(true)?; + socket.set_reuseport(true)?; + socket.bind(addr)?; + let listener = socket.listen(5)?; + let listener_map_for_task = listener_map.clone(); task::spawn(async move { loop { diff --git a/blastmud_game/src/main.rs b/blastmud_game/src/main.rs index 13e63cc8..59c668bd 100644 --- a/blastmud_game/src/main.rs +++ b/blastmud_game/src/main.rs @@ -8,6 +8,7 @@ use tokio::signal::unix::{signal, SignalKind}; mod db; mod listener; mod message_handler; +mod version_cutover; #[derive(Deserialize, Debug)] struct Config { @@ -35,11 +36,12 @@ async fn main() -> Result<(), Box> { let listener_map = listener::make_listener_map(); listener::start_listener(config.listener, listener_map.clone(), - move |msg| { - message_handler::handle(msg, pool.clone(), listener_map.clone()) + move |listener_id, msg| { + message_handler::handle(listener_id, msg, pool.clone(), listener_map.clone()) } ).await?; + version_cutover::replace_old_gameserver(&config.pidfile)?; let mut sigusr1 = signal(SignalKind::user_defined1())?; sigusr1.recv().await; diff --git a/blastmud_game/src/message_handler.rs b/blastmud_game/src/message_handler.rs index b5b3e8b5..dd4ffcc1 100644 --- a/blastmud_game/src/message_handler.rs +++ b/blastmud_game/src/message_handler.rs @@ -1,8 +1,37 @@ -use log::info; use blastmud_interfaces::*; use deadpool_postgres::Pool; use crate::listener::ListenerMap; +use MessageFromListener::*; +use uuid::Uuid; +use tokio::{sync::oneshot, task}; +use crate::listener::ListenerSend; +use std::error::Error; -pub async fn handle(msg: MessageFromListener, _pool: Pool, _listener_map: ListenerMap) { - info!("Processing message: {:?}", msg) +pub async fn handle(listener: Uuid, msg: MessageFromListener, _pool: Pool, listener_map: ListenerMap) + -> Result<(), Box> { + match msg { + ListenerPing { uuid: _ } => {} + SessionConnected { session: _, source: _ } => {} + SessionDisconnected { session: _ } => {} + SessionSentLine { session, msg } => { + let lmlock = listener_map.lock().await; + let opt_sender = lmlock.get(&listener).map(|v| v.clone()); + drop(lmlock); + match opt_sender { + None => {} + Some(sender) => { + task::spawn(async move { + let (tx, rx) = oneshot::channel(); + sender.send(ListenerSend { message: MessageToListener::SendToSession { + session, + msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) }, + ack_notify: tx }).await.unwrap_or(()); + rx.await.unwrap_or(()); + }); + } + } + } + AcknowledgeMessage => {} + } + Ok(()) } diff --git a/blastmud_game/src/version_cutover.rs b/blastmud_game/src/version_cutover.rs new file mode 100644 index 00000000..759693c6 --- /dev/null +++ b/blastmud_game/src/version_cutover.rs @@ -0,0 +1,39 @@ +use std::fs::{read_to_string, write}; +use std::path::Path; +use std::error::Error; +use log::info; +use nix::{sys::signal::{kill, Signal}, unistd::Pid}; + +pub fn replace_old_gameserver(pidfile: &str) -> Result<(), Box> { + match read_to_string(pidfile) { + Err(e) => + if e.kind() == std::io::ErrorKind::NotFound { + info!("pidfile not found, assuming not already running"); + Ok(()) + } else { + info!("Error reading pidfile (other than NotFound): {}", e); + Err(Box::new(e) as Box::) + } + Ok(f) => { + let pid: Pid = Pid::from_raw(f.parse().map_err(|e| Box::new(e) as Box::)?); + match read_to_string(format!("/proc/{}/cmdline", pid)) { + Ok(content) => + if content.contains("blastmud_game") { + info!("pid in pidfile references blastmud_game; starting cutover"); + kill(pid, Signal::SIGUSR1) + .map_err(|e| Box::new(e) as Box) + } else { + info!("Pid in pidfile is for process not including blastmud_game - ignoring pidfile"); + Ok(()) + } + Err(_) => { + info!("Pid in pidfile is gone - ignoring pidfile"); + Ok(()) + } + } + } + }?; + info!("Writing new pidfile"); + write(Path::new(pidfile), format!("{}", std::process::id())) + .map_err(|e| Box::new(e) as Box::) +} diff --git a/blastmud_listener/src/main.rs b/blastmud_listener/src/main.rs index f57298cd..7c382160 100644 --- a/blastmud_listener/src/main.rs +++ b/blastmud_listener/src/main.rs @@ -10,7 +10,7 @@ use tokio::net::{TcpStream, TcpListener}; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::{mpsc, Mutex}; use tokio::io::{BufReader, AsyncWriteExt}; -use log::{warn, info}; +use log::{warn, info, LevelFilter}; use simple_logger::SimpleLogger; use std::sync::Arc; use blastmud_interfaces::*; @@ -48,7 +48,7 @@ fn run_server_task( ) where FHandler: Fn(MessageToListener) -> HandlerFut + Send + 'static, - HandlerFut: Future + HandlerFut: Future + Send + 'static { task::spawn(async move { let conn = loop { @@ -104,17 +104,18 @@ where warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver"); } Ok(Some(msg)) => { - message_handler(msg); - } - } + let mhfut = message_handler(msg); + mhfut.await; - match conn_framed.send(MessageFromListener::AcknowledgeMessage).await { - Ok(_) => {} - Err(e) => { - warn!("Can't send acknowledgement to {}: {}", server, e); - run_server_task(None, listener_id, receiver, sender, server, - message_handler); - break 'full_select; + match conn_framed.send(MessageFromListener::AcknowledgeMessage).await { + Ok(_) => {} + Err(e) => { + warn!("Can't send acknowledgement to {}: {}", server, e); + run_server_task(None, listener_id, receiver, sender, server, + message_handler); + break 'full_select; + } + } } } }, @@ -156,7 +157,8 @@ where break 'wait_for_ack; } Ok(Some(msg)) => { - message_handler(msg); + let mhfut = message_handler(msg); + mhfut.await; } } } @@ -294,7 +296,7 @@ async fn handle_client_socket( break 'client_loop; } SessionCommand::SendString { message } => - match wstream.write_all((message + "\r\n").as_bytes()).await { + match wstream.write_all(message.as_bytes()).await { Err(e) => { info!("Client connection {} got error {}", session, e); } @@ -341,7 +343,7 @@ fn start_pinger(listener: Uuid, server: mpsc::Sender) { #[tokio::main] async fn main() -> Result<(), Box> { - SimpleLogger::new().init().unwrap(); + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); let listener_id = Uuid::new_v4(); let mut config = read_latest_config()?;