Allow disconnections and implement quit.

This commit is contained in:
Condorra 2022-12-25 12:42:03 +11:00
parent 55d3087d21
commit 218ca0b953
8 changed files with 60 additions and 20 deletions

View File

@ -29,7 +29,7 @@ pub struct DBTrans {
pub struct SendqueueItem { pub struct SendqueueItem {
pub item: i64, pub item: i64,
pub session: ListenerSession, pub session: ListenerSession,
pub message: String pub message: Option<String>
} }
impl From<Row> for SendqueueItem { impl From<Row> for SendqueueItem {
fn from(row: Row) -> Self { fn from(row: Row) -> Self {
@ -110,7 +110,7 @@ impl DBPool {
pub async fn queue_for_session(self: &Self, pub async fn queue_for_session(self: &Self,
session: &ListenerSession, session: &ListenerSession,
message: &str) -> DResult<()> { message: Option<&str>) -> DResult<()> {
let conn = self.get_conn().await?; let conn = self.get_conn().await?;
conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?; &[&session.session, &session.listener, &message]).await?;
@ -157,7 +157,7 @@ impl DBPool {
impl DBTrans { impl DBTrans {
pub async fn queue_for_session(self: &Self, pub async fn queue_for_session(self: &Self,
session: &ListenerSession, session: &ListenerSession,
message: &str) -> DResult<()> { message: Option<&str>) -> DResult<()> {
self.pg_trans()? self.pg_trans()?
.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", .execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?; &[&session.session, &session.listener, &message]).await?;

View File

@ -82,11 +82,17 @@ where
match handle_fut.await { match handle_fut.await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
if connected_at.elapsed() > std::time::Duration::from_secs(60) {
// On the assumption errors that get here are bad enough that they are a // On the assumption errors that get here are bad enough that they are a
// problem with the system rather than the message, so we want to log and // problem with the system rather than the message, so we want to log and
// retry later. // retry later.
warn!("Error from message handler - closing listener connection: {}", e); warn!("Error from message handler - closing listener connection: {}", e);
break 'listener_loop; break 'listener_loop;
} else {
warn!("Error from message handler, but we only just connected, so \
acknowledging it anyway as a safety measure against reconnect \
loops: {}", e);
}
} }
} }
match conn_framed.send( match conn_framed.send(

View File

@ -7,12 +7,12 @@ use crate::models::session::Session;
pub async fn handle(session: &ListenerSession, source: String, pool: &DBPool) -> DResult<()> { pub async fn handle(session: &ListenerSession, source: String, pool: &DBPool) -> DResult<()> {
pool.start_session(session, &Session { source, ..Default::default() }).await?; pool.start_session(session, &Session { source, ..Default::default() }).await?;
pool.queue_for_session(&session, &ansi!("\ pool.queue_for_session(&session, Some(&ansi!("\
Welcome to <red>BlastMud<reset> - a text-based post-apocalyptic \ Welcome to <red>BlastMud<reset> - a text-based post-apocalyptic \
game <bold>restricted to adults (18+)<reset>\r\n\ game <bold>restricted to adults (18+)<reset>\r\n\
Some commands to get you started:\r\n\ Some commands to get you started:\r\n\
\t<bold>register <lt>username> <lt>password> <lt>email><reset> to register as a new user.\r\n\ \t<bold>register <lt>username> <lt>password> <lt>email><reset> to register as a new user.\r\n\
\t<bold>connect <lt>username> <lt>password><reset> to log in as an existing user.\r\n\ \t<bold>connect <lt>username> <lt>password><reset> to log in as an existing user.\r\n\
\t<bold>help<reset> to learn more.\r\n")).await?; \t<bold>help<reset> to learn more.\r\n"))).await?;
Ok(()) Ok(())
} }

View File

@ -5,10 +5,12 @@ use ansi_macro::ansi;
use phf::phf_map; use phf::phf_map;
use async_trait::async_trait; use async_trait::async_trait;
use crate::models::session::Session; use crate::models::session::Session;
use log::warn;
mod parsing; mod parsing;
mod ignore; mod ignore;
mod help; mod help;
mod quit;
pub struct VerbContext<'l> { pub struct VerbContext<'l> {
session: &'l ListenerSession, session: &'l ListenerSession,
@ -44,14 +46,20 @@ type UserVerbRegistry = phf::Map<&'static str, UserVerbRef>;
static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! { static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! {
"" => ignore::VERB, "" => ignore::VERB,
"help" => help::VERB "help" => help::VERB,
"quit" => quit::VERB,
}; };
pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> { pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> {
let (cmd, params) = parsing::parse_command_name(msg); let (cmd, params) = parsing::parse_command_name(msg);
let trans = pool.start_transaction().await?; let trans = pool.start_transaction().await?;
let mut session_dat = match trans.get_session_model(session).await? { let mut session_dat = match trans.get_session_model(session).await? {
None => { return Ok(()) } None => {
// If the session has been cleaned up from the database, there is
// nowhere to go from here, so just ignore it.
warn!("Got command from session not in database: {}", session.session);
return Ok(());
}
Some(v) => v Some(v) => v
}; };
let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd); let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd);
@ -60,16 +68,16 @@ pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DRes
match handler_opt { match handler_opt {
None => { None => {
trans.queue_for_session(session, trans.queue_for_session(session,
ansi!( Some(ansi!(
"That's not a command I know. Try <bold>help<reset>\r\n" "That's not a command I know. Try <bold>help<reset>\r\n"
) ))
).await?; ).await?;
} }
Some(handler) => { Some(handler) => {
match handler.handle(&ctx, cmd, params).await { match handler.handle(&ctx, cmd, params).await {
Ok(()) => {} Ok(()) => {}
Err(UserError(err_msg)) => { Err(UserError(err_msg)) => {
trans.queue_for_session(session, &(err_msg + "\r\n")).await?; trans.queue_for_session(session, Some(&(err_msg + "\r\n"))).await?;
} }
Err(SystemError(e)) => Err(e)? Err(SystemError(e)) => Err(e)?
} }

View File

@ -36,7 +36,9 @@ impl UserVerb for Verb {
help = help.or_else(|| HELP_PAGES.get(remaining)); help = help.or_else(|| HELP_PAGES.get(remaining));
let help_final = help.ok_or( let help_final = help.ok_or(
UserError("No help available on that".to_string()))?; UserError("No help available on that".to_string()))?;
ctx.trans.queue_for_session(ctx.session, &(help_final.to_string() + "\r\n")).await?; ctx.trans.queue_for_session(ctx.session,
Some(&(help_final.to_string() + "\r\n"))
).await?;
Ok(()) Ok(())
} }
} }

View File

@ -0,0 +1,18 @@
use super::{
VerbContext, UserVerb, UserVerbRef, UResult
};
use async_trait::async_trait;
use ansi_macro::ansi;
pub struct Verb;
#[async_trait]
impl UserVerb for Verb {
async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, _remaining: &str) -> UResult<()> {
ctx.trans.queue_for_session(ctx.session,
Some(ansi!("<red>Bye!<reset>\r\n"))).await?;
ctx.trans.queue_for_session(ctx.session, None).await?;
Ok(())
}
}
static VERB_INT: Verb = Verb;
pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef;

View File

@ -34,9 +34,14 @@ async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) ->
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
listener_sender.send( listener_sender.send(
ListenerSend { ListenerSend {
message: MessageToListener::SendToSession { message: match item.message.clone() {
None => MessageToListener::DisconnectSession {
session: item.session.session.clone()
},
Some(msg) => MessageToListener::SendToSession {
session: item.session.session.clone(), session: item.session.session.clone(),
msg: item.message.clone() msg: msg
}
}, },
ack_notify: tx ack_notify: tx
} }

View File

@ -40,12 +40,13 @@ CREATE UNLOGGED TABLE sendqueue (
item BIGSERIAL NOT NULL PRIMARY KEY, item BIGSERIAL NOT NULL PRIMARY KEY,
session UUID NOT NULL REFERENCES sessions(session), session UUID NOT NULL REFERENCES sessions(session),
listener UUID REFERENCES listeners(listener), listener UUID REFERENCES listeners(listener),
message TEXT NOT NULL message TEXT /* Nullable, null means disconnect */
); );
CREATE TABLE tasks ( CREATE TABLE tasks (
task_code TEXT NOT NULL, task_code TEXT NOT NULL,
task_type TEXT NOT NULL, task_type TEXT NOT NULL,
is_static BOOL NOT NULL,
next_scheduled TIMESTAMP WITH TIME ZONE NOT NULL, next_scheduled TIMESTAMP WITH TIME ZONE NOT NULL,
details JSONB NOT NULL, details JSONB NOT NULL,
PRIMARY KEY (task_code, task_type) PRIMARY KEY (task_code, task_type)