Add queue system.

This commit is contained in:
Condorra 2023-01-02 13:25:05 +11:00
parent 7edf9b7b9d
commit 6d155c3e68
8 changed files with 246 additions and 50 deletions

View File

@ -9,7 +9,12 @@ use tokio_postgres::NoTls;
use crate::message_handler::ListenerSession; use crate::message_handler::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::message_handler::user_commands::parsing::parse_offset; use crate::message_handler::user_commands::parsing::parse_offset;
use crate::models::{session::Session, user::User, item::Item, task::TaskParse}; use crate::models::{
session::Session,
user::User,
item::Item,
task::{Task, TaskParse}
};
use tokio_postgres::types::ToSql; use tokio_postgres::types::ToSql;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
@ -474,6 +479,15 @@ impl DBTrans {
Ok(()) Ok(())
} }
pub async fn upsert_task(&self, task: &Task) -> DResult<()> {
self.pg_trans()?.execute(
"INSERT INTO tasks (details) \
VALUES ($1) \
ON CONFLICT ((details->>'task_code'), (details->>'task_type')) \
DO UPDATE SET details = $1", &[&serde_json::to_value(task)?]).await?;
Ok(())
}
pub async fn update_task(&self, task_type: &str, task_code: &str, task: &TaskParse) -> DResult<()> { pub async fn update_task(&self, task_type: &str, task_code: &str, task: &TaskParse) -> DResult<()> {
self.pg_trans()?.execute( self.pg_trans()?.execute(
"UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \ "UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \

View File

@ -16,7 +16,7 @@ mod ignore;
mod less_explicit_mode; mod less_explicit_mode;
mod login; mod login;
mod look; mod look;
mod movement; pub mod movement;
pub mod parsing; pub mod parsing;
mod quit; mod quit;
mod register; mod register;

View File

@ -4,16 +4,31 @@ use super::{
look look
}; };
use async_trait::async_trait; use async_trait::async_trait;
use crate::static_content::room::{self, Direction, ExitType}; use crate::{
regular_tasks::queued_command::{
QueueCommandHandler,
QueueCommand,
queue_command
},
static_content::room::{self, Direction, ExitType}
};
use std::time;
pub struct Verb; pub struct QueueHandler;
#[async_trait] #[async_trait]
impl UserVerb for Verb { impl QueueCommandHandler for QueueHandler {
async fn handle(self: &Self, ctx: &mut VerbContext, verb: &str, remaining: &str) -> UResult<()> { async fn start_command(&self, _ctx: &mut VerbContext<'_>, _command: &QueueCommand)
let dir = Direction::parse(verb).ok_or_else(|| UserError("Unknown direction".to_owned()))?; -> UResult<time::Duration> {
if remaining.trim() != "" { Ok(time::Duration::from_secs(1))
user_error("Movement commands don't take extra data at the end.".to_owned())?; }
}
#[allow(unreachable_patterns)]
async fn finish_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand)
-> UResult<()> {
let direction = match command {
QueueCommand::Movement { direction } => direction,
_ => user_error("Unexpected command".to_owned())?
};
let player_item = get_player_item_or_fail(ctx).await?; let player_item = get_player_item_or_fail(ctx).await?;
let (heretype, herecode) = player_item.location.split_once("/").unwrap_or(("room", "repro_xv_chargen")); let (heretype, herecode) = player_item.location.split_once("/").unwrap_or(("room", "repro_xv_chargen"));
if heretype != "room" { if heretype != "room" {
@ -22,7 +37,7 @@ impl UserVerb for Verb {
} }
let room = room::room_map_by_code().get(herecode) let room = room::room_map_by_code().get(herecode)
.ok_or_else(|| UserError("Can't find your current location".to_owned()))?; .ok_or_else(|| UserError("Can't find your current location".to_owned()))?;
let exit = room.exits.iter().find(|ex| ex.direction == *dir) let exit = room.exits.iter().find(|ex| ex.direction == *direction)
.ok_or_else(|| UserError("There is nothing in that direction".to_owned()))?; .ok_or_else(|| UserError("There is nothing in that direction".to_owned()))?;
// Ideally we would queue if we were already moving rather than insta-move. // Ideally we would queue if we were already moving rather than insta-move.
@ -30,7 +45,7 @@ impl UserVerb for Verb {
ExitType::Free => {} ExitType::Free => {}
ExitType::Blocked(blocker) => { ExitType::Blocked(blocker) => {
if !blocker.attempt_exit(ctx, &player_item, exit).await? { if !blocker.attempt_exit(ctx, &player_item, exit).await? {
return Ok(()); user_error("Stopping movement".to_owned())?;
} }
} }
} }
@ -40,9 +55,22 @@ impl UserVerb for Verb {
let mut new_player_item = (*player_item).clone(); let mut new_player_item = (*player_item).clone();
new_player_item.location = format!("{}/{}", "room", new_room.code); new_player_item.location = format!("{}/{}", "room", new_room.code);
ctx.trans.save_item_model(&new_player_item).await?; ctx.trans.save_item_model(&new_player_item).await?;
look::VERB.handle(ctx, verb, remaining).await?; look::VERB.handle(ctx, "look", "").await?;
Ok(()) Ok(())
} }
} }
pub struct Verb;
#[async_trait]
impl UserVerb for Verb {
async fn handle(self: &Self, ctx: &mut VerbContext, verb: &str, remaining: &str) -> UResult<()> {
let dir = Direction::parse(verb).ok_or_else(|| UserError("Unknown direction".to_owned()))?;
if remaining.trim() != "" {
user_error("Movement commands don't take extra data at the end.".to_owned())?;
}
queue_command(ctx, &QueueCommand::Movement { direction: dir.clone() }).await
}
}
static VERB_INT: Verb = Verb; static VERB_INT: Verb = Verb;
pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef; pub static VERB: UserVerbRef = &VERB_INT as UserVerbRef;

View File

@ -1,11 +1,16 @@
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use std::collections::VecDeque;
use crate::regular_tasks::queued_command::QueueCommand;
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Session { pub struct Session {
pub source: String, pub source: String,
pub less_explicit_mode: bool, pub less_explicit_mode: bool,
pub queue: VecDeque<QueueCommand>,
// Reminder: Consider backwards compatibility when updating this. New fields should generally // Reminder: Consider backwards compatibility when updating this. New fields should generally
// be an Option, or things will crash out for existing sessions. // be an Option, or you should ensure the default value is sensible, or things will
// crash out for existing sessions.
} }
impl Session { impl Session {
@ -20,6 +25,7 @@ impl Session {
impl Default for Session { impl Default for Session {
fn default() -> Self { fn default() -> Self {
Session { source: "unknown".to_owned(), less_explicit_mode: false } Session { source: "unknown".to_owned(), less_explicit_mode: false,
queue: VecDeque::new() }
} }
} }

View File

@ -31,6 +31,18 @@ pub struct TaskMeta {
pub next_scheduled: DateTime<Utc>, pub next_scheduled: DateTime<Utc>,
} }
impl Default for TaskMeta {
fn default() -> Self {
Self {
task_code: "unspecified".to_string(),
is_static: false,
recurrence: None,
consecutive_failure_count: 0,
next_scheduled: Utc::now() + chrono::Duration::seconds(3600)
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct Task { pub struct Task {
#[serde(flatten)] #[serde(flatten)]

View File

@ -7,8 +7,9 @@ use log::warn;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::ops::AddAssign; use std::ops::AddAssign;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use chrono::Utc;
mod queued_command; pub mod queued_command;
pub struct TaskRunContext<'l> { pub struct TaskRunContext<'l> {
pub trans: &'l db::DBTrans, pub trans: &'l db::DBTrans,
@ -17,7 +18,7 @@ pub struct TaskRunContext<'l> {
#[async_trait] #[async_trait]
pub trait TaskHandler { pub trait TaskHandler {
async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<()>; async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<Option<time::Duration>>;
} }
fn task_handler_registry() -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> { fn task_handler_registry() -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> {
@ -88,7 +89,7 @@ async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) ->
fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) { fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) {
task::spawn(async move { task::spawn(async move {
loop { loop {
time::sleep(time::Duration::from_secs(1)).await; time::sleep(time::Duration::from_millis(500)).await;
match process_sendqueue_once(pool.clone(), listener_map.clone()).await { match process_sendqueue_once(pool.clone(), listener_map.clone()).await {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
@ -127,25 +128,23 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
if task.meta.consecutive_failure_count > 3 && !task.meta.is_static { if task.meta.consecutive_failure_count > 3 && !task.meta.is_static {
tx.delete_task(&task.details.name(), &task.meta.task_code).await?; tx.delete_task(&task.details.name(), &task.meta.task_code).await?;
} else { } else {
task.meta.next_scheduled.add_assign( task.meta.next_scheduled = Utc::now() + chrono::Duration::seconds(60);
chrono::Duration::seconds(60)
);
tx.update_task(&task.details.name(), &task.meta.task_code, tx.update_task(&task.details.name(), &task.meta.task_code,
&TaskParse::Known(task.clone())).await?; &TaskParse::Known(task.clone())).await?;
} }
tx.commit().await?; tx.commit().await?;
}, },
Ok(()) => { Ok(resched) => {
task.meta.consecutive_failure_count = 0; task.meta.consecutive_failure_count = 0;
match task.meta.recurrence { match task.meta.recurrence.clone().or(
resched.map(|r| TaskRecurrence::FixedDuration { seconds: r.as_secs() as u32 })) {
None => { None => {
tx.delete_task(&task.details.name(), tx.delete_task(&task.details.name(),
&task.meta.task_code).await?; &task.meta.task_code).await?;
} }
Some(TaskRecurrence::FixedDuration { seconds }) => { Some(TaskRecurrence::FixedDuration { seconds }) => {
task.meta.next_scheduled.add_assign( task.meta.next_scheduled = Utc::now() +
chrono::Duration::seconds(seconds as i64) chrono::Duration::seconds(seconds as i64);
);
tx.update_task(&task.details.name(), &task.meta.task_code, tx.update_task(&task.details.name(), &task.meta.task_code,
&TaskParse::Known(task.clone())).await?; &TaskParse::Known(task.clone())).await?;
} }
@ -190,7 +189,7 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
fn start_task_runner(pool: db::DBPool) { fn start_task_runner(pool: db::DBPool) {
task::spawn(async move { task::spawn(async move {
loop { loop {
time::sleep(time::Duration::from_secs(1)).await; time::sleep(time::Duration::from_millis(500)).await;
match process_tasks_once(pool.clone()).await { match process_tasks_once(pool.clone()).await {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {

View File

@ -1,18 +1,154 @@
use super::{TaskHandler, TaskRunContext}; use super::{TaskHandler, TaskRunContext};
use async_trait::async_trait; use async_trait::async_trait;
use std::time;
use chrono::Utc;
use crate::DResult; use crate::DResult;
use serde::{Serialize, Deserialize};
use std::collections::BTreeMap;
use crate::models::task::{
Task,
TaskMeta,
TaskDetails,
};
use crate::message_handler::user_commands::{
VerbContext,
CommandHandlingError,
UResult,
movement,
user_error,
get_user_or_fail
};
use crate::static_content::room::Direction;
use once_cell::sync::OnceCell;
pub struct RunQueuedCommandTaskHandler; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum QueueCommand {
Movement { direction: Direction },
}
impl QueueCommand {
pub fn name(&self) -> &'static str {
use QueueCommand::*;
match self {
Movement {..} => "Movement"
}
}
}
#[async_trait] #[async_trait]
impl TaskHandler for RunQueuedCommandTaskHandler { pub trait QueueCommandHandler {
async fn do_task(&self, _ctx: &mut TaskRunContext) -> DResult<()> { async fn start_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult<time::Duration>;
Ok(()) async fn finish_command(&self, ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult<()>;
/* }
match ctx.task {
_ => Err("Unexpected task type")? fn queue_command_registry() -> &'static BTreeMap<&'static str, &'static (dyn QueueCommandHandler + Sync + Send)> {
}? */ static REGISTRY: OnceCell<BTreeMap<&'static str, &'static (dyn QueueCommandHandler + Sync + Send)>> =
OnceCell::new();
REGISTRY.get_or_init(|| vec!(
("Movement", &movement::QueueHandler as &(dyn QueueCommandHandler + Sync + Send))
).into_iter().collect())
}
pub async fn queue_command(ctx: &mut VerbContext<'_>, command: &QueueCommand) -> UResult<()> {
let was_empty = ctx.session_dat.queue.is_empty();
let username = get_user_or_fail(ctx)?.username.to_lowercase();
if ctx.session_dat.queue.len() >= 20 {
user_error("Can't queue more than 20 actions\n".to_owned())?;
}
ctx.session_dat.queue.push_back(command.clone());
if was_empty {
match queue_command_registry()
.get(&command.name())
.expect("QueueCommand to have been registered")
.start_command(ctx, &command).await {
Err(CommandHandlingError::UserError(err_msg)) => {
ctx.session_dat.queue.truncate(0);
ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?;
user_error(err_msg)?;
}
Err(CommandHandlingError::SystemError(e)) => Err(e)?,
Ok(dur) => {
ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?;
ctx.trans.upsert_task(&Task {
meta: TaskMeta {
task_code: username,
next_scheduled: Utc::now() + chrono::Duration::from_std(dur)?,
..Default::default()
},
details: TaskDetails::RunQueuedCommand
}).await?;
}
}
} else {
ctx.trans.queue_for_session(ctx.session, Some("[queued]\n")).await?;
ctx.trans.save_session_model(ctx.session, ctx.session_dat).await?;
}
Ok(())
}
pub struct RunQueuedCommandTaskHandler;
#[async_trait]
impl TaskHandler for RunQueuedCommandTaskHandler {
async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<Option<time::Duration>> {
let username: &str = ctx.task.meta.task_code.as_str();
let (listener_sess, mut sess_dets) =
match ctx.trans.find_session_for_player(username).await? {
None => {
// Queue is gone if session is gone, and don't schedule another
// job, but otherwise this is a successful run.
return Ok(None);
},
Some(x) => x
};
let queue_command = match sess_dets.queue.pop_front() {
None => { return Ok(None); }
Some(x) => x
};
let mut user = ctx.trans.find_by_username(username).await?;
let mut verbcontext = VerbContext {
session: &listener_sess,
session_dat: &mut sess_dets,
user_dat: &mut user,
trans: ctx.trans
};
let uresult_finish =
queue_command_registry()
.get(&queue_command.name())
.expect("QueueCommand to have been registered")
.finish_command(&mut verbcontext, &queue_command).await;
match uresult_finish {
Ok(()) => {}
Err(CommandHandlingError::UserError(err_msg)) => {
ctx.trans.queue_for_session(&listener_sess, Some(&(err_msg + "\r\n"))).await?;
sess_dets.queue.truncate(0);
ctx.trans.save_session_model(&listener_sess, &sess_dets).await?;
return Ok(None);
}
Err(CommandHandlingError::SystemError(e)) => Err(e)?
};
let next_command_opt = verbcontext.session_dat.queue.front().cloned();
let result = match next_command_opt {
None => None,
Some(next_command) => {
match queue_command_registry()
.get(&next_command.name())
.expect("QueueCommand to have been registered")
.start_command(&mut verbcontext, &next_command).await {
Err(CommandHandlingError::UserError(err_msg)) => {
ctx.trans.queue_for_session(&listener_sess, Some(&(err_msg + "\r\n"))).await?;
sess_dets.queue.truncate(0);
ctx.trans.save_session_model(&listener_sess, &sess_dets).await?;
None
}
Err(CommandHandlingError::SystemError(e)) => Err(e)?,
Ok(dur) => Some(dur)
}
}
};
ctx.trans.save_session_model(&listener_sess, &sess_dets).await?;
Ok(result)
} }
} }

View File

@ -3,6 +3,7 @@ use once_cell::sync::OnceCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use ansi::ansi; use ansi::ansi;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use crate::message_handler::user_commands::{ use crate::message_handler::user_commands::{
UResult, VerbContext UResult, VerbContext
}; };
@ -47,7 +48,7 @@ impl GridCoords {
Direction::SOUTHWEST => GridCoords {x: self.x - 1, y: self.y + 1, ..*self}, Direction::SOUTHWEST => GridCoords {x: self.x - 1, y: self.y + 1, ..*self},
Direction::UP => GridCoords {z: self.z + 1, ..*self}, Direction::UP => GridCoords {z: self.z + 1, ..*self},
Direction::DOWN => GridCoords {z: self.z - 1, ..*self}, Direction::DOWN => GridCoords {z: self.z - 1, ..*self},
Direction::IN(_) => self.clone() Direction::IN { .. } => self.clone()
} }
} }
} }
@ -70,7 +71,7 @@ pub enum ExitType {
} }
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Serialize, Deserialize)]
pub enum Direction { pub enum Direction {
NORTH, NORTH,
SOUTH, SOUTH,
@ -82,23 +83,23 @@ pub enum Direction {
SOUTHWEST, SOUTHWEST,
UP, UP,
DOWN, DOWN,
IN(&'static str) IN { item: String }
} }
impl Direction { impl Direction {
pub fn describe(self: &Self) -> &'static str { pub fn describe(self: &Self) -> String {
match self { match self {
Direction::NORTH => "north", Direction::NORTH => "north".to_owned(),
Direction::SOUTH => "south", Direction::SOUTH => "south".to_owned(),
Direction::EAST => "east", Direction::EAST => "east".to_owned(),
Direction::WEST => "west", Direction::WEST => "west".to_owned(),
Direction::NORTHEAST => "northeast", Direction::NORTHEAST => "northeast".to_owned(),
Direction::SOUTHEAST => "southeast", Direction::SOUTHEAST => "southeast".to_owned(),
Direction::NORTHWEST => "northwest", Direction::NORTHWEST => "northwest".to_owned(),
Direction::SOUTHWEST => "southwest", Direction::SOUTHWEST => "southwest".to_owned(),
Direction::UP => "up", Direction::UP => "up".to_owned(),
Direction::DOWN => "down", Direction::DOWN => "down".to_owned(),
Direction::IN(s) => s Direction::IN { item } => item.to_owned()
} }
} }