From 6d155c3e68388077f40dafe25bc18f0691cb140d Mon Sep 17 00:00:00 2001 From: Shagnor Date: Mon, 2 Jan 2023 13:25:05 +1100 Subject: [PATCH] Add queue system. --- blastmud_game/src/db.rs | 16 +- .../src/message_handler/user_commands.rs | 2 +- .../message_handler/user_commands/movement.rs | 50 ++++-- blastmud_game/src/models/session.rs | 10 +- blastmud_game/src/models/task.rs | 12 ++ blastmud_game/src/regular_tasks.rs | 23 ++- .../src/regular_tasks/queued_command.rs | 152 +++++++++++++++++- blastmud_game/src/static_content/room.rs | 31 ++-- 8 files changed, 246 insertions(+), 50 deletions(-) diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index a2741f2e..d8887e66 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -9,7 +9,12 @@ use tokio_postgres::NoTls; use crate::message_handler::ListenerSession; use crate::DResult; use crate::message_handler::user_commands::parsing::parse_offset; -use crate::models::{session::Session, user::User, item::Item, task::TaskParse}; +use crate::models::{ + session::Session, + user::User, + item::Item, + task::{Task, TaskParse} +}; use tokio_postgres::types::ToSql; use std::collections::BTreeSet; use std::sync::Arc; @@ -474,6 +479,15 @@ impl DBTrans { Ok(()) } + pub async fn upsert_task(&self, task: &Task) -> DResult<()> { + self.pg_trans()?.execute( + "INSERT INTO tasks (details) \ + VALUES ($1) \ + ON CONFLICT ((details->>'task_code'), (details->>'task_type')) \ + DO UPDATE SET details = $1", &[&serde_json::to_value(task)?]).await?; + Ok(()) + } + pub async fn update_task(&self, task_type: &str, task_code: &str, task: &TaskParse) -> DResult<()> { self.pg_trans()?.execute( "UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \ diff --git a/blastmud_game/src/message_handler/user_commands.rs b/blastmud_game/src/message_handler/user_commands.rs index 5e7a0e8e..11f3d37a 100644 --- a/blastmud_game/src/message_handler/user_commands.rs +++ b/blastmud_game/src/message_handler/user_commands.rs @@ -16,7 +16,7 @@ mod ignore; mod less_explicit_mode; mod login; mod look; -mod movement; +pub mod movement; pub mod parsing; mod quit; mod register; diff --git a/blastmud_game/src/message_handler/user_commands/movement.rs b/blastmud_game/src/message_handler/user_commands/movement.rs index 1a05f811..5d143741 100644 --- a/blastmud_game/src/message_handler/user_commands/movement.rs +++ b/blastmud_game/src/message_handler/user_commands/movement.rs @@ -4,16 +4,31 @@ use super::{ look }; use async_trait::async_trait; -use crate::static_content::room::{self, Direction, ExitType}; +use crate::{ + regular_tasks::queued_command::{ + QueueCommandHandler, + QueueCommand, + queue_command + }, + static_content::room::{self, Direction, ExitType} +}; +use std::time; -pub struct Verb; +pub struct QueueHandler; #[async_trait] -impl UserVerb for Verb { - async fn handle(self: &Self, ctx: &mut VerbContext, verb: &str, remaining: &str) -> UResult<()> { - let dir = Direction::parse(verb).ok_or_else(|| UserError("Unknown direction".to_owned()))?; - if remaining.trim() != "" { - user_error("Movement commands don't take extra data at the end.".to_owned())?; - } +impl QueueCommandHandler for QueueHandler { + async fn start_command(&self, _ctx: &mut VerbContext<'_>, _command: &QueueCommand) + -> UResult { + Ok(time::Duration::from_secs(1)) + } + + #[allow(unreachable_patterns)] + async fn finish_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand) + -> UResult<()> { + let direction = match command { + QueueCommand::Movement { direction } => direction, + _ => user_error("Unexpected command".to_owned())? + }; let player_item = get_player_item_or_fail(ctx).await?; let (heretype, herecode) = player_item.location.split_once("/").unwrap_or(("room", "repro_xv_chargen")); if heretype != "room" { @@ -22,7 +37,7 @@ impl UserVerb for Verb { } let room = room::room_map_by_code().get(herecode) .ok_or_else(|| UserError("Can't find your current location".to_owned()))?; - let exit = room.exits.iter().find(|ex| ex.direction == *dir) + let exit = room.exits.iter().find(|ex| ex.direction == *direction) .ok_or_else(|| UserError("There is nothing in that direction".to_owned()))?; // Ideally we would queue if we were already moving rather than insta-move. @@ -30,7 +45,7 @@ impl UserVerb for Verb { ExitType::Free => {} ExitType::Blocked(blocker) => { if !blocker.attempt_exit(ctx, &player_item, exit).await? { - return Ok(()); + user_error("Stopping movement".to_owned())?; } } } @@ -40,9 +55,22 @@ impl UserVerb for Verb { let mut new_player_item = (*player_item).clone(); new_player_item.location = format!("{}/{}", "room", new_room.code); ctx.trans.save_item_model(&new_player_item).await?; - look::VERB.handle(ctx, verb, remaining).await?; + look::VERB.handle(ctx, "look", "").await?; Ok(()) } } + +pub struct Verb; + +#[async_trait] +impl UserVerb for Verb { + async fn handle(self: &Self, ctx: &mut VerbContext, verb: &str, remaining: &str) -> UResult<()> { + let dir = Direction::parse(verb).ok_or_else(|| UserError("Unknown direction".to_owned()))?; + if remaining.trim() != "" { + user_error("Movement commands don't take extra data at the end.".to_owned())?; + } + queue_command(ctx, &QueueCommand::Movement { direction: dir.clone() }).await + } +} static VERB_INT: Verb = Verb; pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef; diff --git a/blastmud_game/src/models/session.rs b/blastmud_game/src/models/session.rs index 3f0a1a38..fc344884 100644 --- a/blastmud_game/src/models/session.rs +++ b/blastmud_game/src/models/session.rs @@ -1,11 +1,16 @@ use serde::{Serialize, Deserialize}; +use std::collections::VecDeque; +use crate::regular_tasks::queued_command::QueueCommand; #[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] pub struct Session { pub source: String, pub less_explicit_mode: bool, + pub queue: VecDeque, // Reminder: Consider backwards compatibility when updating this. New fields should generally - // be an Option, or things will crash out for existing sessions. + // be an Option, or you should ensure the default value is sensible, or things will + // crash out for existing sessions. } impl Session { @@ -20,6 +25,7 @@ impl Session { impl Default for Session { fn default() -> Self { - Session { source: "unknown".to_owned(), less_explicit_mode: false } + Session { source: "unknown".to_owned(), less_explicit_mode: false, + queue: VecDeque::new() } } } diff --git a/blastmud_game/src/models/task.rs b/blastmud_game/src/models/task.rs index 8c48e259..85d694a0 100644 --- a/blastmud_game/src/models/task.rs +++ b/blastmud_game/src/models/task.rs @@ -31,6 +31,18 @@ pub struct TaskMeta { pub next_scheduled: DateTime, } +impl Default for TaskMeta { + fn default() -> Self { + Self { + task_code: "unspecified".to_string(), + is_static: false, + recurrence: None, + consecutive_failure_count: 0, + next_scheduled: Utc::now() + chrono::Duration::seconds(3600) + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct Task { #[serde(flatten)] diff --git a/blastmud_game/src/regular_tasks.rs b/blastmud_game/src/regular_tasks.rs index 448b5b39..d4b956a3 100644 --- a/blastmud_game/src/regular_tasks.rs +++ b/blastmud_game/src/regular_tasks.rs @@ -7,8 +7,9 @@ use log::warn; use once_cell::sync::OnceCell; use std::ops::AddAssign; use std::collections::BTreeMap; +use chrono::Utc; -mod queued_command; +pub mod queued_command; pub struct TaskRunContext<'l> { pub trans: &'l db::DBTrans, @@ -17,7 +18,7 @@ pub struct TaskRunContext<'l> { #[async_trait] pub trait TaskHandler { - async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<()>; + async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult>; } fn task_handler_registry() -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> { @@ -88,7 +89,7 @@ async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) { task::spawn(async move { loop { - time::sleep(time::Duration::from_secs(1)).await; + time::sleep(time::Duration::from_millis(500)).await; match process_sendqueue_once(pool.clone(), listener_map.clone()).await { Ok(()) => {} Err(e) => { @@ -127,25 +128,23 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> { if task.meta.consecutive_failure_count > 3 && !task.meta.is_static { tx.delete_task(&task.details.name(), &task.meta.task_code).await?; } else { - task.meta.next_scheduled.add_assign( - chrono::Duration::seconds(60) - ); + task.meta.next_scheduled = Utc::now() + chrono::Duration::seconds(60); tx.update_task(&task.details.name(), &task.meta.task_code, &TaskParse::Known(task.clone())).await?; } tx.commit().await?; }, - Ok(()) => { + Ok(resched) => { task.meta.consecutive_failure_count = 0; - match task.meta.recurrence { + match task.meta.recurrence.clone().or( + resched.map(|r| TaskRecurrence::FixedDuration { seconds: r.as_secs() as u32 })) { None => { tx.delete_task(&task.details.name(), &task.meta.task_code).await?; } Some(TaskRecurrence::FixedDuration { seconds }) => { - task.meta.next_scheduled.add_assign( - chrono::Duration::seconds(seconds as i64) - ); + task.meta.next_scheduled = Utc::now() + + chrono::Duration::seconds(seconds as i64); tx.update_task(&task.details.name(), &task.meta.task_code, &TaskParse::Known(task.clone())).await?; } @@ -190,7 +189,7 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> { fn start_task_runner(pool: db::DBPool) { task::spawn(async move { loop { - time::sleep(time::Duration::from_secs(1)).await; + time::sleep(time::Duration::from_millis(500)).await; match process_tasks_once(pool.clone()).await { Ok(()) => {} Err(e) => { diff --git a/blastmud_game/src/regular_tasks/queued_command.rs b/blastmud_game/src/regular_tasks/queued_command.rs index 7d29d1bb..4324b6f0 100644 --- a/blastmud_game/src/regular_tasks/queued_command.rs +++ b/blastmud_game/src/regular_tasks/queued_command.rs @@ -1,18 +1,154 @@ use super::{TaskHandler, TaskRunContext}; use async_trait::async_trait; +use std::time; +use chrono::Utc; use crate::DResult; +use serde::{Serialize, Deserialize}; +use std::collections::BTreeMap; +use crate::models::task::{ + Task, + TaskMeta, + TaskDetails, +}; +use crate::message_handler::user_commands::{ + VerbContext, + CommandHandlingError, + UResult, + movement, + user_error, + get_user_or_fail +}; +use crate::static_content::room::Direction; +use once_cell::sync::OnceCell; -pub struct RunQueuedCommandTaskHandler; +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum QueueCommand { + Movement { direction: Direction }, +} +impl QueueCommand { + pub fn name(&self) -> &'static str { + use QueueCommand::*; + match self { + Movement {..} => "Movement" + } + } +} +#[async_trait] +pub trait QueueCommandHandler { + async fn start_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult; + async fn finish_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult<()>; +} + +fn queue_command_registry() -> &'static BTreeMap<&'static str, &'static (dyn QueueCommandHandler + Sync + Send)> { + static REGISTRY: OnceCell> = + OnceCell::new(); + REGISTRY.get_or_init(|| vec!( + ("Movement", &movement::QueueHandler as &(dyn QueueCommandHandler + Sync + Send)) + ).into_iter().collect()) +} + +pub async fn queue_command(ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult<()> { + let was_empty = ctx.session_dat.queue.is_empty(); + let username = get_user_or_fail(ctx)?.username.to_lowercase(); + if ctx.session_dat.queue.len() >= 20 { + user_error("Can't queue more than 20 actions\n".to_owned())?; + } + ctx.session_dat.queue.push_back(command.clone()); + if was_empty { + match queue_command_registry() + .get(&command.name()) + .expect("QueueCommand to have been registered") + .start_command(ctx, &command).await { + Err(CommandHandlingError::UserError(err_msg)) => { + ctx.session_dat.queue.truncate(0); + ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?; + user_error(err_msg)?; + } + Err(CommandHandlingError::SystemError(e)) => Err(e)?, + Ok(dur) => { + ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?; + ctx.trans.upsert_task(&Task { + meta: TaskMeta { + task_code: username, + next_scheduled: Utc::now() + chrono::Duration::from_std(dur)?, + ..Default::default() + }, + details: TaskDetails::RunQueuedCommand + }).await?; + } + } + + } else { + ctx.trans.queue_for_session(ctx.session, Some("[queued]\n")).await?; + ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?; + } + Ok(()) +} + +pub struct RunQueuedCommandTaskHandler; #[async_trait] impl TaskHandler for RunQueuedCommandTaskHandler { - async fn do_task(&self, _ctx: &mut TaskRunContext) -> DResult<()> { - Ok(()) - /* - match ctx.task { - - _ => Err("Unexpected task type")? - }? */ + async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult> { + let username: &str = ctx.task.meta.task_code.as_str(); + let (listener_sess, mut sess_dets) = + match ctx.trans.find_session_for_player(username).await? { + None => { + // Queue is gone if session is gone, and don't schedule another + // job, but otherwise this is a successful run. + return Ok(None); + }, + Some(x) => x + }; + let queue_command = match sess_dets.queue.pop_front() { + None => { return Ok(None); } + Some(x) => x + }; + let mut user = ctx.trans.find_by_username(username).await?; + let mut verbcontext = VerbContext { + session: &listener_sess, + session_dat: &mut sess_dets, + user_dat: &mut user, + trans: ctx.trans + }; + let uresult_finish = + queue_command_registry() + .get(&queue_command.name()) + .expect("QueueCommand to have been registered") + .finish_command(&mut verbcontext, &queue_command).await; + match uresult_finish { + Ok(()) => {} + Err(CommandHandlingError::UserError(err_msg)) => { + ctx.trans.queue_for_session(&listener_sess, Some(&(err_msg + "\r\n"))).await?; + sess_dets.queue.truncate(0); + ctx.trans.save_session_model(&listener_sess, &sess_dets).await?; + return Ok(None); + } + Err(CommandHandlingError::SystemError(e)) => Err(e)? + }; + + let next_command_opt = verbcontext.session_dat.queue.front().cloned(); + let result = match next_command_opt { + None => None, + Some(next_command) => { + match queue_command_registry() + .get(&next_command.name()) + .expect("QueueCommand to have been registered") + .start_command(&mut verbcontext, &next_command).await { + Err(CommandHandlingError::UserError(err_msg)) => { + ctx.trans.queue_for_session(&listener_sess, Some(&(err_msg + "\r\n"))).await?; + sess_dets.queue.truncate(0); + ctx.trans.save_session_model(&listener_sess, &sess_dets).await?; + None + } + Err(CommandHandlingError::SystemError(e)) => Err(e)?, + Ok(dur) => Some(dur) + } + } + }; + ctx.trans.save_session_model(&listener_sess, &sess_dets).await?; + + Ok(result) } } diff --git a/blastmud_game/src/static_content/room.rs b/blastmud_game/src/static_content/room.rs index 025f773e..722a2ad3 100644 --- a/blastmud_game/src/static_content/room.rs +++ b/blastmud_game/src/static_content/room.rs @@ -3,6 +3,7 @@ use once_cell::sync::OnceCell; use std::collections::BTreeMap; use ansi::ansi; use async_trait::async_trait; +use serde::{Serialize, Deserialize}; use crate::message_handler::user_commands::{ UResult, VerbContext }; @@ -47,7 +48,7 @@ impl GridCoords { Direction::SOUTHWEST => GridCoords {x: self.x - 1, y: self.y + 1, ..*self}, Direction::UP => GridCoords {z: self.z + 1, ..*self}, Direction::DOWN => GridCoords {z: self.z - 1, ..*self}, - Direction::IN(_) => self.clone() + Direction::IN { .. } => self.clone() } } } @@ -70,7 +71,7 @@ pub enum ExitType { } #[allow(dead_code)] -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Serialize, Deserialize)] pub enum Direction { NORTH, SOUTH, @@ -82,23 +83,23 @@ pub enum Direction { SOUTHWEST, UP, DOWN, - IN(&'static str) + IN { item: String } } impl Direction { - pub fn describe(self: &Self) -> &'static str { + pub fn describe(self: &Self) -> String { match self { - Direction::NORTH => "north", - Direction::SOUTH => "south", - Direction::EAST => "east", - Direction::WEST => "west", - Direction::NORTHEAST => "northeast", - Direction::SOUTHEAST => "southeast", - Direction::NORTHWEST => "northwest", - Direction::SOUTHWEST => "southwest", - Direction::UP => "up", - Direction::DOWN => "down", - Direction::IN(s) => s + Direction::NORTH => "north".to_owned(), + Direction::SOUTH => "south".to_owned(), + Direction::EAST => "east".to_owned(), + Direction::WEST => "west".to_owned(), + Direction::NORTHEAST => "northeast".to_owned(), + Direction::SOUTHEAST => "southeast".to_owned(), + Direction::NORTHWEST => "northwest".to_owned(), + Direction::SOUTHWEST => "southwest".to_owned(), + Direction::UP => "up".to_owned(), + Direction::DOWN => "down".to_owned(), + Direction::IN { item } => item.to_owned() } }