diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index ca0d86d..f0047de 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -29,7 +29,7 @@ pub struct DBTrans { pub struct SendqueueItem { pub item: i64, pub session: ListenerSession, - pub message: String + pub message: Option } impl From for SendqueueItem { fn from(row: Row) -> Self { @@ -110,7 +110,7 @@ impl DBPool { pub async fn queue_for_session(self: &Self, session: &ListenerSession, - message: &str) -> DResult<()> { + message: Option<&str>) -> DResult<()> { let conn = self.get_conn().await?; conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", &[&session.session, &session.listener, &message]).await?; @@ -157,7 +157,7 @@ impl DBPool { impl DBTrans { pub async fn queue_for_session(self: &Self, session: &ListenerSession, - message: &str) -> DResult<()> { + message: Option<&str>) -> DResult<()> { self.pg_trans()? .execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", &[&session.session, &session.listener, &message]).await?; diff --git a/blastmud_game/src/listener.rs b/blastmud_game/src/listener.rs index 9cca105..af746a4 100644 --- a/blastmud_game/src/listener.rs +++ b/blastmud_game/src/listener.rs @@ -82,11 +82,17 @@ where match handle_fut.await { Ok(_) => {} Err(e) => { - // On the assumption errors that get here are bad enough that they are a - // problem with the system rather than the message, so we want to log and - // retry later. - warn!("Error from message handler - closing listener connection: {}", e); - break 'listener_loop; + if connected_at.elapsed() > std::time::Duration::from_secs(60) { + // On the assumption errors that get here are bad enough that they are a + // problem with the system rather than the message, so we want to log and + // retry later. + warn!("Error from message handler - closing listener connection: {}", e); + break 'listener_loop; + } else { + warn!("Error from message handler, but we only just connected, so \ + acknowledging it anyway as a safety measure against reconnect \ + loops: {}", e); + } } } match conn_framed.send( diff --git a/blastmud_game/src/message_handler/new_session.rs b/blastmud_game/src/message_handler/new_session.rs index c8d9ec4..6c4c326 100644 --- a/blastmud_game/src/message_handler/new_session.rs +++ b/blastmud_game/src/message_handler/new_session.rs @@ -7,12 +7,12 @@ use crate::models::session::Session; pub async fn handle(session: &ListenerSession, source: String, pool: &DBPool) -> DResult<()> { pool.start_session(session, &Session { source, ..Default::default() }).await?; - pool.queue_for_session(&session, &ansi!("\ + pool.queue_for_session(&session, Some(&ansi!("\ Welcome to BlastMud - a text-based post-apocalyptic \ game restricted to adults (18+)\r\n\ Some commands to get you started:\r\n\ \tregister username> password> email> to register as a new user.\r\n\ \tconnect username> password> to log in as an existing user.\r\n\ - \thelp to learn more.\r\n")).await?; + \thelp to learn more.\r\n"))).await?; Ok(()) } diff --git a/blastmud_game/src/message_handler/user_commands.rs b/blastmud_game/src/message_handler/user_commands.rs index 302d3b6..614560f 100644 --- a/blastmud_game/src/message_handler/user_commands.rs +++ b/blastmud_game/src/message_handler/user_commands.rs @@ -5,10 +5,12 @@ use ansi_macro::ansi; use phf::phf_map; use async_trait::async_trait; use crate::models::session::Session; +use log::warn; mod parsing; mod ignore; mod help; +mod quit; pub struct VerbContext<'l> { session: &'l ListenerSession, @@ -44,14 +46,20 @@ type UserVerbRegistry = phf::Map<&'static str, UserVerbRef>; static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! { "" => ignore::VERB, - "help" => help::VERB + "help" => help::VERB, + "quit" => quit::VERB, }; pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> { let (cmd, params) = parsing::parse_command_name(msg); let trans = pool.start_transaction().await?; let mut session_dat = match trans.get_session_model(session).await? { - None => { return Ok(()) } + None => { + // If the session has been cleaned up from the database, there is + // nowhere to go from here, so just ignore it. + warn!("Got command from session not in database: {}", session.session); + return Ok(()); + } Some(v) => v }; let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd); @@ -60,16 +68,16 @@ pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DRes match handler_opt { None => { trans.queue_for_session(session, - ansi!( + Some(ansi!( "That's not a command I know. Try help\r\n" - ) + )) ).await?; } Some(handler) => { match handler.handle(&ctx, cmd, params).await { Ok(()) => {} Err(UserError(err_msg)) => { - trans.queue_for_session(session, &(err_msg + "\r\n")).await?; + trans.queue_for_session(session, Some(&(err_msg + "\r\n"))).await?; } Err(SystemError(e)) => Err(e)? } diff --git a/blastmud_game/src/message_handler/user_commands/help.rs b/blastmud_game/src/message_handler/user_commands/help.rs index 2de8fd9..8b0d050 100644 --- a/blastmud_game/src/message_handler/user_commands/help.rs +++ b/blastmud_game/src/message_handler/user_commands/help.rs @@ -36,7 +36,9 @@ impl UserVerb for Verb { help = help.or_else(|| HELP_PAGES.get(remaining)); let help_final = help.ok_or( UserError("No help available on that".to_string()))?; - ctx.trans.queue_for_session(ctx.session, &(help_final.to_string() + "\r\n")).await?; + ctx.trans.queue_for_session(ctx.session, + Some(&(help_final.to_string() + "\r\n")) + ).await?; Ok(()) } } diff --git a/blastmud_game/src/message_handler/user_commands/quit.rs b/blastmud_game/src/message_handler/user_commands/quit.rs new file mode 100644 index 0000000..72142b7 --- /dev/null +++ b/blastmud_game/src/message_handler/user_commands/quit.rs @@ -0,0 +1,18 @@ +use super::{ + VerbContext, UserVerb, UserVerbRef, UResult +}; +use async_trait::async_trait; +use ansi_macro::ansi; + +pub struct Verb; +#[async_trait] +impl UserVerb for Verb { + async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, _remaining: &str) -> UResult<()> { + ctx.trans.queue_for_session(ctx.session, + Some(ansi!("Bye!\r\n"))).await?; + ctx.trans.queue_for_session(ctx.session, None).await?; + Ok(()) + } +} +static VERB_INT: Verb = Verb; +pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef; diff --git a/blastmud_game/src/regular_tasks.rs b/blastmud_game/src/regular_tasks.rs index ac40cac..39452bc 100644 --- a/blastmud_game/src/regular_tasks.rs +++ b/blastmud_game/src/regular_tasks.rs @@ -34,9 +34,14 @@ async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> let (tx, rx) = oneshot::channel(); listener_sender.send( ListenerSend { - message: MessageToListener::SendToSession { - session: item.session.session.clone(), - msg: item.message.clone() + message: match item.message.clone() { + None => MessageToListener::DisconnectSession { + session: item.session.session.clone() + }, + Some(msg) => MessageToListener::SendToSession { + session: item.session.session.clone(), + msg: msg + } }, ack_notify: tx } diff --git a/schema/schema.sql b/schema/schema.sql index 98281f3..331a080 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -40,12 +40,13 @@ CREATE UNLOGGED TABLE sendqueue ( item BIGSERIAL NOT NULL PRIMARY KEY, session UUID NOT NULL REFERENCES sessions(session), listener UUID REFERENCES listeners(listener), - message TEXT NOT NULL + message TEXT /* Nullable, null means disconnect */ ); CREATE TABLE tasks ( task_code TEXT NOT NULL, task_type TEXT NOT NULL, + is_static BOOL NOT NULL, next_scheduled TIMESTAMP WITH TIME ZONE NOT NULL, details JSONB NOT NULL, PRIMARY KEY (task_code, task_type)