Add start of task runner framework (in progress).
This commit is contained in:
parent
8f66177f43
commit
7edf9b7b9d
@ -9,7 +9,7 @@ use tokio_postgres::NoTls;
|
|||||||
use crate::message_handler::ListenerSession;
|
use crate::message_handler::ListenerSession;
|
||||||
use crate::DResult;
|
use crate::DResult;
|
||||||
use crate::message_handler::user_commands::parsing::parse_offset;
|
use crate::message_handler::user_commands::parsing::parse_offset;
|
||||||
use crate::models::{session::Session, user::User, item::Item};
|
use crate::models::{session::Session, user::User, item::Item, task::TaskParse};
|
||||||
use tokio_postgres::types::ToSql;
|
use tokio_postgres::types::ToSql;
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -454,6 +454,34 @@ impl DBTrans {
|
|||||||
.map(Arc::new)
|
.map(Arc::new)
|
||||||
.collect()))
|
.collect()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_next_scheduled_task(&self) -> DResult<Option<TaskParse>> {
|
||||||
|
match self.pg_trans()?.query_opt(
|
||||||
|
"SELECT details FROM tasks WHERE \
|
||||||
|
CAST(details->>'next_scheduled' AS TIMESTAMPTZ) <= now() \
|
||||||
|
ORDER BY details->>'next_scheduled'", &[]
|
||||||
|
).await? {
|
||||||
|
None => Ok(None),
|
||||||
|
Some(row) => Ok(serde_json::from_value(row.get("details"))?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_task(&self, task_type: &str, task_code: &str) -> DResult<()> {
|
||||||
|
self.pg_trans()?.execute(
|
||||||
|
"DELETE FROM tasks WHERE details->>'task_type' = $1 AND \
|
||||||
|
details->>'task_code' = $2", &[&task_type, &task_code]
|
||||||
|
).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_task(&self, task_type: &str, task_code: &str, task: &TaskParse) -> DResult<()> {
|
||||||
|
self.pg_trans()?.execute(
|
||||||
|
"UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \
|
||||||
|
details->>'task_code' = $2",
|
||||||
|
&[&task_type, &task_code, &serde_json::to_value(task)?]
|
||||||
|
).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn commit(mut self: Self) -> DResult<()> {
|
pub async fn commit(mut self: Self) -> DResult<()> {
|
||||||
let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None));
|
let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None));
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
pub mod session;
|
pub mod session;
|
||||||
pub mod user;
|
pub mod user;
|
||||||
pub mod item;
|
pub mod item;
|
||||||
|
pub mod task;
|
||||||
|
57
blastmud_game/src/models/task.rs
Normal file
57
blastmud_game/src/models/task.rs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
|
pub enum TaskRecurrence {
|
||||||
|
FixedDuration { seconds: u32 }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
|
#[serde(tag="task_type", content="task_details")]
|
||||||
|
pub enum TaskDetails {
|
||||||
|
RunQueuedCommand
|
||||||
|
}
|
||||||
|
impl TaskDetails {
|
||||||
|
pub fn name(self: &Self) -> &'static str {
|
||||||
|
use TaskDetails::*;
|
||||||
|
match self {
|
||||||
|
RunQueuedCommand => "RunQueuedCommand"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
|
pub struct TaskMeta {
|
||||||
|
pub task_code: String,
|
||||||
|
pub is_static: bool,
|
||||||
|
pub recurrence: Option<TaskRecurrence>,
|
||||||
|
pub consecutive_failure_count: u32,
|
||||||
|
pub next_scheduled: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
|
pub struct Task {
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub meta: TaskMeta,
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub details: TaskDetails,
|
||||||
|
// Be careful of backwards compatibility if you add anything new
|
||||||
|
// (consider Option).
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
|
||||||
|
pub struct TaskOther {
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub meta: TaskMeta,
|
||||||
|
pub task_type: String,
|
||||||
|
pub task_details: Value
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
pub enum TaskParse {
|
||||||
|
Known(Task),
|
||||||
|
Unknown(TaskOther)
|
||||||
|
}
|
@ -1,9 +1,34 @@
|
|||||||
use tokio::{task, time, sync::oneshot};
|
use tokio::{task, time, sync::oneshot};
|
||||||
use crate::DResult;
|
use async_trait::async_trait;
|
||||||
use crate::db;
|
use crate::{DResult, db, models::task::{Task, TaskParse, TaskRecurrence}};
|
||||||
use crate::listener::{ListenerMap, ListenerSend};
|
use crate::listener::{ListenerMap, ListenerSend};
|
||||||
use blastmud_interfaces::MessageToListener;
|
use blastmud_interfaces::MessageToListener;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use std::ops::AddAssign;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
mod queued_command;
|
||||||
|
|
||||||
|
pub struct TaskRunContext<'l> {
|
||||||
|
pub trans: &'l db::DBTrans,
|
||||||
|
pub task: &'l mut Task
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait TaskHandler {
|
||||||
|
async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_handler_registry() -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> {
|
||||||
|
static TASK_HANDLER_REGISTRY: OnceCell<BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)>> =
|
||||||
|
OnceCell::new();
|
||||||
|
TASK_HANDLER_REGISTRY.get_or_init(
|
||||||
|
|| vec!(
|
||||||
|
("RunQueuedCommand", queued_command::HANDLER.clone())
|
||||||
|
).into_iter().collect()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
|
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
|
||||||
for listener in pool.get_dead_listeners().await? {
|
for listener in pool.get_dead_listeners().await? {
|
||||||
@ -74,8 +99,111 @@ fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
|
||||||
|
loop {
|
||||||
|
let tx = pool.start_transaction().await?;
|
||||||
|
match tx.get_next_scheduled_task().await? {
|
||||||
|
None => { break; }
|
||||||
|
Some(task_parse) => {
|
||||||
|
match task_parse {
|
||||||
|
TaskParse::Known(mut task) => {
|
||||||
|
match task_handler_registry().get(task.details.name()) {
|
||||||
|
None => {
|
||||||
|
warn!("Found a known but unregistered task type: {}",
|
||||||
|
task.details.name());
|
||||||
|
// This is always a logic error, so just delete the task
|
||||||
|
// to help with recovery.
|
||||||
|
tx.delete_task(&task.details.name(), &task.meta.task_code).await?;
|
||||||
|
tx.commit().await?;
|
||||||
|
}
|
||||||
|
Some(handler) => {
|
||||||
|
let mut ctx = TaskRunContext { trans: &tx, task: &mut task };
|
||||||
|
match handler.do_task(&mut ctx).await {
|
||||||
|
Err(e) => {
|
||||||
|
task.meta.consecutive_failure_count += 1;
|
||||||
|
warn!("Error handling event of type {} code {} (consecutive count: {}): {:?}",
|
||||||
|
&task.details.name(), &task.meta.task_code,
|
||||||
|
task.meta.consecutive_failure_count, e);
|
||||||
|
if task.meta.consecutive_failure_count > 3 && !task.meta.is_static {
|
||||||
|
tx.delete_task(&task.details.name(), &task.meta.task_code).await?;
|
||||||
|
} else {
|
||||||
|
task.meta.next_scheduled.add_assign(
|
||||||
|
chrono::Duration::seconds(60)
|
||||||
|
);
|
||||||
|
tx.update_task(&task.details.name(), &task.meta.task_code,
|
||||||
|
&TaskParse::Known(task.clone())).await?;
|
||||||
|
}
|
||||||
|
tx.commit().await?;
|
||||||
|
},
|
||||||
|
Ok(()) => {
|
||||||
|
task.meta.consecutive_failure_count = 0;
|
||||||
|
match task.meta.recurrence {
|
||||||
|
None => {
|
||||||
|
tx.delete_task(&task.details.name(),
|
||||||
|
&task.meta.task_code).await?;
|
||||||
|
}
|
||||||
|
Some(TaskRecurrence::FixedDuration { seconds }) => {
|
||||||
|
task.meta.next_scheduled.add_assign(
|
||||||
|
chrono::Duration::seconds(seconds as i64)
|
||||||
|
);
|
||||||
|
tx.update_task(&task.details.name(), &task.meta.task_code,
|
||||||
|
&TaskParse::Known(task.clone())).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.commit().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TaskParse::Unknown(mut task) => {
|
||||||
|
warn!("Found unknown task type: {}, code: {}",
|
||||||
|
&task.task_type, &task.meta.task_code);
|
||||||
|
if task.meta.is_static {
|
||||||
|
// Probably a new (or newly removed) static type.
|
||||||
|
// We just skip this tick of it.
|
||||||
|
match task.meta.recurrence {
|
||||||
|
None => {
|
||||||
|
tx.delete_task(&task.task_type, &task.meta.task_code).await?;
|
||||||
|
tx.commit().await?;
|
||||||
|
}
|
||||||
|
Some(TaskRecurrence::FixedDuration { seconds }) => {
|
||||||
|
task.meta.next_scheduled.add_assign(
|
||||||
|
chrono::Duration::seconds(seconds as i64)
|
||||||
|
);
|
||||||
|
tx.update_task(&task.task_type, &task.meta.task_code,
|
||||||
|
&TaskParse::Unknown(task.clone())).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tx.delete_task(&task.task_type, &task.meta.task_code).await?;
|
||||||
|
tx.commit().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_task_runner(pool: db::DBPool) {
|
||||||
|
task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
time::sleep(time::Duration::from_secs(1)).await;
|
||||||
|
match process_tasks_once(pool.clone()).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error processing tasks: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
pub fn start_regular_tasks(pool: &db::DBPool, listener_map: ListenerMap) -> DResult<()> {
|
pub fn start_regular_tasks(pool: &db::DBPool, listener_map: ListenerMap) -> DResult<()> {
|
||||||
start_session_cleanup_task(pool.clone());
|
start_session_cleanup_task(pool.clone());
|
||||||
start_send_queue_task(pool.clone(), listener_map);
|
start_send_queue_task(pool.clone(), listener_map);
|
||||||
|
start_task_runner(pool.clone());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
19
blastmud_game/src/regular_tasks/queued_command.rs
Normal file
19
blastmud_game/src/regular_tasks/queued_command.rs
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
use super::{TaskHandler, TaskRunContext};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use crate::DResult;
|
||||||
|
|
||||||
|
pub struct RunQueuedCommandTaskHandler;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl TaskHandler for RunQueuedCommandTaskHandler {
|
||||||
|
async fn do_task(&self, _ctx: &mut TaskRunContext) -> DResult<()> {
|
||||||
|
Ok(())
|
||||||
|
/*
|
||||||
|
match ctx.task {
|
||||||
|
|
||||||
|
_ => Err("Unexpected task type")?
|
||||||
|
}? */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub static HANDLER: &'static (dyn TaskHandler + Sync + Send) = &RunQueuedCommandTaskHandler;
|
Loading…
Reference in New Issue
Block a user