Start work on command parsing.

This commit is contained in:
Condorra 2022-12-24 21:16:23 +11:00
parent ac10d7db84
commit 4ce39f9421
13 changed files with 194 additions and 28 deletions

34
Cargo.lock generated
View File

@ -13,9 +13,9 @@ dependencies = [
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.59" version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -62,6 +62,7 @@ name = "blastmud_game"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ansi_macro", "ansi_macro",
"async-trait",
"base64 0.20.0", "base64 0.20.0",
"blastmud_interfaces", "blastmud_interfaces",
"deadpool", "deadpool",
@ -69,8 +70,11 @@ dependencies = [
"futures", "futures",
"log", "log",
"nix", "nix",
"nom",
"phf",
"ring", "ring",
"serde", "serde",
"serde_json",
"serde_yaml", "serde_yaml",
"simple_logger", "simple_logger",
"tokio", "tokio",
@ -797,9 +801,33 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [ dependencies = [
"phf_macros",
"phf_shared", "phf_shared",
] ]
[[package]]
name = "phf_generator"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf"
dependencies = [
"phf_shared",
"rand",
]
[[package]]
name = "phf_macros"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92aacdc5f16768709a569e913f7451034034178b05bdc8acda226659a3dccc66"
dependencies = [
"phf_generator",
"phf_shared",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "phf_shared" name = "phf_shared"
version = "0.11.1" version = "0.11.1"
@ -868,6 +896,8 @@ dependencies = [
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
"postgres-protocol", "postgres-protocol",
"serde",
"serde_json",
"uuid", "uuid",
] ]

View File

@ -19,8 +19,12 @@ serde = { version = "1.0.150", features = ["derive", "serde_derive"] }
serde_yaml = "0.9.14" serde_yaml = "0.9.14"
simple_logger = "4.0.0" simple_logger = "4.0.0"
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] } tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
tokio-postgres = { version = "0.7.7", features = ["with-uuid-1"] } tokio-postgres = { version = "0.7.7", features = ["with-uuid-1", "with-serde_json-1"] }
tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] } tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] }
tokio-stream = "0.1.11" tokio-stream = "0.1.11"
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
uuid = { version = "1.2.2", features = ["v4", "serde", "rng"] } uuid = { version = "1.2.2", features = ["v4", "serde", "rng"] }
serde_json = "1.0.91"
phf = { version = "0.11.1", features = ["macros"] }
async-trait = "0.1.60"
nom = "7.1.1"

View File

@ -7,6 +7,8 @@ use uuid::Uuid;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use crate::message_handler::ListenerSession; use crate::message_handler::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::models::session::Session;
use serde_json;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DBPool { pub struct DBPool {
@ -33,7 +35,7 @@ impl From<Row> for SendqueueItem {
} }
impl DBPool { impl DBPool {
pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> { pub async fn record_listener_ping(self: &DBPool, listener: Uuid) -> DResult<()> {
self.get_conn().await?.execute( self.get_conn().await?.execute(
"INSERT INTO listeners (listener, last_seen) \ "INSERT INTO listeners (listener, last_seen) \
VALUES ($1, NOW()) \ VALUES ($1, NOW()) \
@ -42,14 +44,14 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn get_dead_listeners(self: Self) -> DResult<Vec<Uuid>> { pub async fn get_dead_listeners(self: &Self) -> DResult<Vec<Uuid>> {
Ok(self.get_conn().await? Ok(self.get_conn().await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL '2 minutes'", &[]) INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect()) .await?.into_iter().map(|r| r.get(0)).collect())
} }
pub async fn cleanup_listener(self: Self, listener: Uuid) -> DResult<()> { pub async fn cleanup_listener(self: &Self, listener: Uuid) -> DResult<()> {
let mut conn = self.get_conn().await?; let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?; let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \ tx.execute("UPDATE users SET current_session = NULL, \
@ -65,16 +67,16 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn start_session(self: Self, session: &ListenerSession) -> DResult<()> { pub async fn start_session(self: &Self, session: &ListenerSession, details: &Session) -> DResult<()> {
self.get_conn().await?.execute( self.get_conn().await?.execute(
"INSERT INTO sessions (session, listener, details) \ "INSERT INTO sessions (session, listener, details) \
VALUES ($1, $2, '{}') ON CONFLICT (session) DO NOTHING", VALUES ($1, $2, $3) ON CONFLICT (session) DO NOTHING",
&[&session.session, &session.listener] &[&session.session, &session.listener, &serde_json::to_value(details)?]
).await?; ).await?;
Ok(()) Ok(())
} }
pub async fn end_session(self: Self, session: ListenerSession) -> DResult<()> { pub async fn end_session(self: &Self, session: ListenerSession) -> DResult<()> {
let mut conn = self.get_conn().await?; let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?; let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \ tx.execute("UPDATE users SET current_session = NULL, \
@ -88,7 +90,7 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn queue_for_session(self: Self, pub async fn queue_for_session(self: &Self,
session: &ListenerSession, session: &ListenerSession,
message: &str) -> DResult<()> { message: &str) -> DResult<()> {
let conn = self.get_conn().await?; let conn = self.get_conn().await?;
@ -97,7 +99,7 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn get_from_sendqueue(self: Self) -> DResult<Vec<SendqueueItem>> { pub async fn get_from_sendqueue(self: &Self) -> DResult<Vec<SendqueueItem>> {
let conn = self.get_conn().await?; let conn = self.get_conn().await?;
Ok(conn.query("SELECT item, session, listener, message FROM sendqueue ORDER BY item ASC LIMIT 10", Ok(conn.query("SELECT item, session, listener, message FROM sendqueue ORDER BY item ASC LIMIT 10",
&[]) &[])
@ -105,13 +107,13 @@ impl DBPool {
} }
pub async fn delete_from_sendqueue(self: DBPool, item: &SendqueueItem) -> DResult<()> { pub async fn delete_from_sendqueue(self: &DBPool, item: &SendqueueItem) -> DResult<()> {
let conn = self.get_conn().await?; let conn = self.get_conn().await?;
conn.execute("DELETE FROM sendqueue WHERE item=$1", &[&item.item]).await?; conn.execute("DELETE FROM sendqueue WHERE item=$1", &[&item.item]).await?;
Ok(()) Ok(())
} }
pub async fn get_conn(self: DBPool) -> pub async fn get_conn(self: &DBPool) ->
DResult<Object> { DResult<Object> {
let conn = self.pool.get().await?; let conn = self.pool.get().await?;
conn.execute("SET synchronous_commit=off", &[]).await?; conn.execute("SET synchronous_commit=off", &[]).await?;

View File

@ -12,6 +12,7 @@ mod message_handler;
mod version_cutover; mod version_cutover;
mod av; mod av;
mod regular_tasks; mod regular_tasks;
mod models;
pub type DResult<T> = Result<T, Box<dyn Error + Send + Sync>>; pub type DResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
@ -39,7 +40,7 @@ async fn main() -> DResult<()> {
let pool = DBPool::start(&config.database_conn_string)?; let pool = DBPool::start(&config.database_conn_string)?;
// Test the database connection string works so we quit early if not... // Test the database connection string works so we quit early if not...
let _ = pool.clone().get_conn().await?.query("SELECT 1", &[]).await?; let _ = pool.get_conn().await?.query("SELECT 1", &[]).await?;
info!("Database pool initialised"); info!("Database pool initialised");
@ -53,7 +54,7 @@ async fn main() -> DResult<()> {
).await?; ).await?;
version_cutover::replace_old_gameserver(&config.pidfile)?; version_cutover::replace_old_gameserver(&config.pidfile)?;
regular_tasks::start_regular_tasks(pool.clone(), listener_map)?; regular_tasks::start_regular_tasks(&pool, listener_map)?;
let mut sigusr1 = signal(SignalKind::user_defined1())?; let mut sigusr1 = signal(SignalKind::user_defined1())?;
sigusr1.recv().await; sigusr1.recv().await;

View File

@ -18,13 +18,14 @@ pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool)
match msg { match msg {
ListenerPing { .. } => { pool.record_listener_ping(listener).await?; } ListenerPing { .. } => { pool.record_listener_ping(listener).await?; }
SessionConnected { session, source } => { SessionConnected { session, source } => {
new_session::handle(&ListenerSession { listener, session }, &source, pool).await?; new_session::handle(
&ListenerSession { listener, session }, source, &pool).await?;
} }
SessionDisconnected { session } => { SessionDisconnected { session } => {
pool.end_session(ListenerSession { listener, session }).await?; pool.end_session(ListenerSession { listener, session }).await?;
} }
SessionSentLine { session, msg } => { SessionSentLine { session, msg } => {
user_commands::handle(&ListenerSession { listener, session }, &msg, pool).await?; user_commands::handle(&ListenerSession { listener, session }, &msg, &pool).await?;
} }
AcknowledgeMessage => {} AcknowledgeMessage => {}
} }

View File

@ -2,9 +2,11 @@ use crate::message_handler::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::db::DBPool; use crate::db::DBPool;
use ansi_macro::ansi; use ansi_macro::ansi;
use std::default::Default;
use crate::models::session::Session;
pub async fn handle(session: &ListenerSession, _source: &str, pool: DBPool) -> DResult<()> { pub async fn handle(session: &ListenerSession, source: String, pool: &DBPool) -> DResult<()> {
pool.clone().start_session(session).await?; pool.start_session(session, &Session { source, ..Default::default() }).await?;
pool.queue_for_session(&session, &ansi!("\ pool.queue_for_session(&session, &ansi!("\
Welcome to <red>BlastMud<reset> - a text-based post-apocalyptic \ Welcome to <red>BlastMud<reset> - a text-based post-apocalyptic \
game <bold>restricted to adults (18+)<reset>\r\n\ game <bold>restricted to adults (18+)<reset>\r\n\

View File

@ -1,12 +1,78 @@
use crate::message_handler::ListenerSession; use super::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::db::DBPool; use crate::db::DBPool;
use ansi_macro::ansi; use ansi_macro::ansi;
use phf::phf_map;
use async_trait::async_trait;
pub async fn handle(session: &ListenerSession, msg: &str, pool: DBPool) -> DResult<()> { mod parsing;
mod ignore;
mod help;
#[derive(Debug)]
pub struct VerbContext<'l> {
session: &'l ListenerSession,
pool: &'l DBPool
}
pub enum CommandHandlingError {
UserError(String),
SystemError(Box<dyn std::error::Error + Send + Sync>)
}
use CommandHandlingError::*;
#[async_trait]
pub trait UserVerb {
async fn handle(self: &Self, ctx: &VerbContext, verb: &str, remaining: &str) -> UResult<()>;
}
pub type UserVerbRef = &'static (dyn UserVerb + Sync + Send);
pub type UResult<A> = Result<A, CommandHandlingError>;
impl From<Box<dyn std::error::Error + Send + Sync>> for CommandHandlingError {
fn from(input: Box<dyn std::error::Error + Send + Sync>) -> CommandHandlingError {
SystemError(input)
}
}
pub fn user_error<A>(msg: String) -> UResult<A> {
Err(UserError(msg))
}
type UserVerbRegistry = phf::Map<&'static str, UserVerbRef>;
static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! {
"" => ignore::VERB,
"help" => help::VERB
};
pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> {
let (cmd, params) = parsing::parse_command_name(msg);
let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd);
match handler_opt {
None => {
pool.queue_for_session(session,
ansi!(
"That's not a command I know. Try <bold>help<reset>\r\n"
)
).await?;
}
Some(handler) => {
match handler.handle(&VerbContext { session, pool }, cmd, params).await {
Ok(()) => {}
Err(UserError(err_msg)) => {
pool.queue_for_session(session, &err_msg).await?;
}
Err(SystemError(e)) => Err(e)?
}
}
}
/*
pool.queue_for_session(session, pool.queue_for_session(session,
&format!(ansi!( &format!(ansi!(
"You hear an echo saying: <bggreen><red>{}<reset>\r\n" "You hear an echo saying: <bggreen><red>{}<reset>\r\n"
), msg)).await?; ), msg)).await?;
*/
Ok(()) Ok(())
} }

View File

@ -0,0 +1,13 @@
use super::{VerbContext, UserVerb, UserVerbRef, UResult, user_error};
use async_trait::async_trait;
pub struct Verb;
#[async_trait]
impl UserVerb for Verb {
async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, remaining: &str) -> UResult<()> {
user_error("Not implemented yet\r\n".to_string())?;
Ok(())
}
}
static VERB_INT: Verb = Verb;
pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef;

View File

@ -0,0 +1,12 @@
use super::{VerbContext, UserVerb, UserVerbRef, UResult};
use async_trait::async_trait;
pub struct Verb;
#[async_trait]
impl UserVerb for Verb {
async fn handle(self: &Self, _ctx: &VerbContext, _verb: &str, _remaining: &str) -> UResult<()> {
Ok(())
}
}
static VERB_INT: Verb = Verb;
pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef;

View File

@ -0,0 +1,19 @@
use nom::{
bytes::complete::{take_till1},
character::complete::space0,
IResult,
};
pub fn parse_command_name(input: &str) -> (&str, &str) {
fn parse(input: &str) -> IResult<&str, &str> {
let (input, _) = space0(input)?;
let (input, cmd) = take_till1(|c| c == ' ' || c == '\n')(input)?;
let (input, _) = space0(input)?;
Ok((input, cmd))
}
match parse(input) {
/* This parser only fails on empty / whitespace only strings. */
Err(_) => ("", ""),
Ok((rest, command)) => (command, rest)
}
}

View File

@ -0,0 +1 @@
pub mod session;

View File

@ -0,0 +1,15 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Session {
pub source: String,
pub less_explicit_mode: bool,
// Reminder: Consider backwards compatibility when updating this. New fields should generally
// be an Option, or things will crash out for existing sessions.
}
impl Default for Session {
fn default() -> Self {
Session { source: "unknown".to_owned(), less_explicit_mode: false }
}
}

View File

@ -6,8 +6,8 @@ use blastmud_interfaces::MessageToListener;
use log::warn; use log::warn;
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
for listener in pool.clone().get_dead_listeners().await? { for listener in pool.get_dead_listeners().await? {
pool.clone().cleanup_listener(listener).await?; pool.cleanup_listener(listener).await?;
} }
Ok(()) Ok(())
} }
@ -27,7 +27,7 @@ fn start_session_cleanup_task(pool: db::DBPool) {
} }
async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> {
for item in pool.clone().get_from_sendqueue().await? { for item in pool.get_from_sendqueue().await? {
match listener_map.lock().await.get(&item.session.listener).map(|l| l.clone()) { match listener_map.lock().await.get(&item.session.listener).map(|l| l.clone()) {
None => {} None => {}
Some(listener_sender) => { Some(listener_sender) => {
@ -42,7 +42,7 @@ async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) ->
} }
).await.unwrap_or(()); ).await.unwrap_or(());
rx.await.unwrap_or(()); rx.await.unwrap_or(());
pool.clone().delete_from_sendqueue(&item).await?; pool.delete_from_sendqueue(&item).await?;
} }
} }
} }
@ -63,8 +63,8 @@ fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) {
}); });
} }
pub fn start_regular_tasks(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { pub fn start_regular_tasks(pool: &db::DBPool, listener_map: ListenerMap) -> DResult<()> {
start_session_cleanup_task(pool.clone()); start_session_cleanup_task(pool.clone());
start_send_queue_task(pool, listener_map); start_send_queue_task(pool.clone(), listener_map);
Ok(()) Ok(())
} }