blastmud/blastmud_game/src/regular_tasks.rs

326 lines
13 KiB
Rust
Raw Normal View History

#[double]
use crate::db::DBTrans;
#[cfg(not(test))]
use crate::models::task::{TaskParse, TaskRecurrence};
2023-01-22 01:16:00 +11:00
use crate::{
db::{self, is_concurrency_error},
2023-01-22 01:16:00 +11:00
listener::{ListenerMap, ListenerSend},
message_handler::user_commands::{delete, drop, hire, open, rent},
models::task::Task,
services::{combat, effect, spawn, urges},
static_content::npc,
DResult,
2023-01-22 01:16:00 +11:00
};
use async_trait::async_trait;
2022-12-23 23:31:49 +11:00
use blastmud_interfaces::MessageToListener;
#[cfg(not(test))]
use chrono::Utc;
use log::warn;
use mockall_double::double;
use once_cell::sync::OnceCell;
use std::collections::BTreeMap;
#[cfg(not(test))]
use std::ops::AddAssign;
use tokio::{sync::oneshot, task, time};
2023-01-02 13:25:05 +11:00
pub mod queued_command;
pub struct TaskRunContext<'l> {
pub trans: &'l DBTrans,
pub task: &'l mut Task,
}
#[async_trait]
pub trait TaskHandler {
2023-01-02 13:25:05 +11:00
async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult<Option<time::Duration>>;
}
fn task_handler_registry(
) -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> {
static TASK_HANDLER_REGISTRY: OnceCell<
BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)>,
> = OnceCell::new();
TASK_HANDLER_REGISTRY.get_or_init(|| {
vec![
("RunQueuedCommand", queued_command::HANDLER.clone()),
("NPCSay", npc::SAY_HANDLER.clone()),
("NPCWander", npc::WANDER_HANDLER.clone()),
("NPCAggro", npc::AGGRO_HANDLER.clone()),
2023-01-23 22:52:01 +11:00
("AttackTick", combat::TASK_HANDLER.clone()),
("RecloneNPC", npc::RECLONE_HANDLER.clone()),
("RotCorpse", combat::ROT_CORPSE_HANDLER.clone()),
2023-02-25 23:49:46 +11:00
("DelayedHealth", effect::DELAYED_HEALTH_HANDLER.clone()),
("ExpireItem", drop::EXPIRE_ITEM_HANDLER.clone()),
2023-04-16 01:54:03 +10:00
("ChargeRoom", rent::CHARGE_ROOM_HANDLER.clone()),
("SwingShut", open::SWING_SHUT_HANDLER.clone()),
("DestroyUser", delete::DESTROY_USER_HANDLER.clone()),
("ChargeWages", hire::CHARGE_WAGES_HANDLER.clone()),
("TickUrges", urges::TICK_URGES_HANDLER.clone()),
("ResetSpawns", spawn::RESET_SPAWNS_HANDLER.clone()),
]
.into_iter()
.collect()
})
}
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
2022-12-24 21:16:23 +11:00
for listener in pool.get_dead_listeners().await? {
pool.cleanup_listener(listener).await?;
}
Ok(())
}
2022-12-23 23:31:49 +11:00
fn start_session_cleanup_task(pool: db::DBPool) {
task::spawn(async move {
loop {
2022-12-23 23:31:49 +11:00
time::sleep(time::Duration::from_secs(60)).await;
match cleanup_session_once(pool.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error cleaning up sessions: {}", e);
}
}
}
});
2022-12-23 23:31:49 +11:00
}
async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> {
2022-12-29 22:17:55 +11:00
loop {
let q = pool.get_from_sendqueue().await?;
for item in &q {
match listener_map
.lock()
.await
.get(&item.session.listener)
.map(|l| l.clone())
{
2022-12-29 22:17:55 +11:00
None => {}
Some(listener_sender) => {
let (tx, rx) = oneshot::channel();
listener_sender
.send(ListenerSend {
2022-12-29 22:17:55 +11:00
message: match item.message.clone() {
None => MessageToListener::DisconnectSession {
session: item.session.session.clone(),
2022-12-29 22:17:55 +11:00
},
Some(msg) => MessageToListener::SendToSession {
session: item.session.session.clone(),
msg: msg,
},
},
ack_notify: tx,
})
.await
.unwrap_or(());
2022-12-29 22:17:55 +11:00
rx.await.unwrap_or(());
pool.delete_from_sendqueue(&item).await?;
}
2022-12-23 23:31:49 +11:00
}
}
2022-12-29 22:17:55 +11:00
if q.len() <= 9 {
break;
}
2022-12-23 23:31:49 +11:00
}
Ok(())
}
2022-12-23 23:31:49 +11:00
fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) {
task::spawn(async move {
loop {
2023-01-02 13:25:05 +11:00
time::sleep(time::Duration::from_millis(500)).await;
2022-12-23 23:31:49 +11:00
match process_sendqueue_once(pool.clone(), listener_map.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error processing sendqueue: {}", e);
}
}
}
});
}
#[cfg(not(test))]
async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
loop {
let tx = pool.start_transaction().await?;
match tx.get_next_scheduled_task().await? {
None => {
break;
}
Some(task_parse) => {
match task_parse {
TaskParse::Known(mut task) => {
match task_handler_registry().get(task.details.name()) {
None => {
warn!(
"Found a known but unregistered task type: {}",
task.details.name()
);
// This is always a logic error, so just delete the task
// to help with recovery.
tx.delete_task(&task.details.name(), &task.meta.task_code)
.await?;
tx.commit().await?;
}
Some(handler) => {
let mut ctx = TaskRunContext {
trans: &tx,
task: &mut task,
};
match handler.do_task(&mut ctx).await {
Err(e) => {
if is_concurrency_error(e.as_ref()) {
continue;
}
task.meta.consecutive_failure_count += 1;
warn!("Error handling event of type {} code {} (consecutive count: {}): {:?}",
&task.details.name(), &task.meta.task_code,
task.meta.consecutive_failure_count, e);
if task.meta.consecutive_failure_count > 3
&& !task.meta.is_static
{
tx.delete_task(
&task.details.name(),
&task.meta.task_code,
)
.await?;
} else {
task.meta.next_scheduled =
Utc::now() + chrono::Duration::seconds(60);
tx.update_task(
&task.details.name(),
&task.meta.task_code,
&TaskParse::Known(task.clone()),
)
.await?;
}
tx.commit().await?;
}
2023-01-02 13:25:05 +11:00
Ok(resched) => {
task.meta.consecutive_failure_count = 0;
match task.meta.recurrence.clone().or(resched.map(|r| {
TaskRecurrence::FixedDuration {
seconds: r.as_secs() as u32,
}
})) {
None => {
tx.delete_task(
&task.details.name(),
&task.meta.task_code,
)
.await?;
}
Some(TaskRecurrence::FixedDuration { seconds }) => {
task.meta.next_scheduled = Utc::now()
+ chrono::Duration::seconds(seconds as i64);
tx.update_task(
&task.details.name(),
&task.meta.task_code,
&TaskParse::Known(task.clone()),
)
.await?;
}
}
tx.commit().await?;
}
}
}
}
}
TaskParse::Unknown(mut task) => {
warn!(
"Found unknown task type: {}, code: {}",
&task.task_type, &task.meta.task_code
);
if task.meta.is_static {
// Probably a new (or newly removed) static type.
// We just skip this tick of it.
match task.meta.recurrence {
None => {
tx.delete_task(&task.task_type, &task.meta.task_code)
.await?;
tx.commit().await?;
}
Some(TaskRecurrence::FixedDuration { seconds }) => {
task.meta
.next_scheduled
.add_assign(chrono::Duration::seconds(seconds as i64));
tx.update_task(
&task.task_type,
&task.meta.task_code,
&TaskParse::Unknown(task.clone()),
)
.await?;
}
}
} else {
tx.delete_task(&task.task_type, &task.meta.task_code)
.await?;
tx.commit().await?;
}
}
}
}
}
}
Ok(())
}
#[cfg(test)]
async fn process_tasks_once(_pool: db::DBPool) -> DResult<()> {
task_handler_registry();
Ok(())
}
fn start_task_runner(pool: db::DBPool) {
task::spawn(async move {
loop {
2023-01-02 13:25:05 +11:00
time::sleep(time::Duration::from_millis(500)).await;
match process_tasks_once(pool.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error processing tasks: {}", e);
}
}
}
});
}
async fn send_version_once(listener_map: ListenerMap) -> DResult<()> {
for listener_sender in listener_map.lock().await.values().cloned() {
let (tx, rx) = oneshot::channel();
listener_sender
.send(ListenerSend {
message: MessageToListener::GameserverVersion {
version: env!("GIT_VERSION").to_owned(),
},
ack_notify: tx,
})
.await
.unwrap_or(());
rx.await.unwrap_or(());
}
Ok(())
}
fn start_version_send_task(listener_map: ListenerMap) {
task::spawn(async move {
loop {
time::sleep(time::Duration::from_millis(30000)).await;
match send_version_once(listener_map.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error processing tasks: {}", e);
}
}
}
});
}
2022-12-24 21:16:23 +11:00
pub fn start_regular_tasks(pool: &db::DBPool, listener_map: ListenerMap) -> DResult<()> {
2022-12-23 23:31:49 +11:00
start_session_cleanup_task(pool.clone());
start_send_queue_task(pool.clone(), listener_map.clone());
start_version_send_task(listener_map);
start_task_runner(pool.clone());
2022-12-23 23:31:49 +11:00
Ok(())
}