#[cfg(not(test))] use crate::db::is_concurrency_error; #[double] use crate::db::DBTrans; #[cfg(not(test))] use crate::models::task::{TaskParse, TaskRecurrence}; use crate::{ db, listener::{ListenerMap, ListenerSend}, message_handler::user_commands::{delete, drop, hire, open, rent}, models::task::Task, services::{combat, effect, spawn, urges}, static_content::npc::{self, computer_museum_npcs}, DResult, }; use async_trait::async_trait; use blastmud_interfaces::MessageToListener; #[cfg(not(test))] use chrono::Utc; use log::warn; use mockall_double::double; use once_cell::sync::OnceCell; use std::collections::BTreeMap; #[cfg(not(test))] use std::ops::AddAssign; use tokio::{sync::oneshot, task, time}; pub mod queued_command; pub struct TaskRunContext<'l> { pub trans: &'l DBTrans, pub task: &'l mut Task, } #[async_trait] pub trait TaskHandler { async fn do_task(&self, ctx: &mut TaskRunContext) -> DResult>; } fn task_handler_registry( ) -> &'static BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)> { static TASK_HANDLER_REGISTRY: OnceCell< BTreeMap<&'static str, &'static (dyn TaskHandler + Sync + Send)>, > = OnceCell::new(); TASK_HANDLER_REGISTRY.get_or_init(|| { vec![ ("RunQueuedCommand", queued_command::HANDLER.clone()), ("NPCSay", npc::SAY_HANDLER.clone()), ("NPCWander", npc::WANDER_HANDLER.clone()), ("NPCAggro", npc::AGGRO_HANDLER.clone()), ("AttackTick", combat::TASK_HANDLER.clone()), ("RecloneNPC", npc::RECLONE_HANDLER.clone()), ("RotCorpse", combat::ROT_CORPSE_HANDLER.clone()), ("DelayedHealth", effect::DELAYED_HEALTH_HANDLER.clone()), ("ExpireItem", drop::EXPIRE_ITEM_HANDLER.clone()), ("ChargeRoom", rent::CHARGE_ROOM_HANDLER.clone()), ("SwingShut", open::SWING_SHUT_HANDLER.clone()), ("DestroyUser", delete::DESTROY_USER_HANDLER.clone()), ("ChargeWages", hire::CHARGE_WAGES_HANDLER.clone()), ("TickUrges", urges::TICK_URGES_HANDLER.clone()), ("ResetSpawns", spawn::RESET_SPAWNS_HANDLER.clone()), ( "ResetHanoi", computer_museum_npcs::RESET_GAME_HANDLER.clone(), ), ] .into_iter() .collect() }) } async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { for listener in pool.get_dead_listeners().await? { pool.cleanup_listener(listener).await?; } Ok(()) } fn start_session_cleanup_task(pool: db::DBPool) { task::spawn(async move { loop { time::sleep(time::Duration::from_secs(60)).await; match cleanup_session_once(pool.clone()).await { Ok(()) => {} Err(e) => { warn!("Error cleaning up sessions: {}", e); } } } }); } async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { loop { let q = pool.get_from_sendqueue().await?; for item in &q { match listener_map .lock() .await .get(&item.session.listener) .map(|l| l.clone()) { None => {} Some(listener_sender) => { let (tx, rx) = oneshot::channel(); listener_sender .send(ListenerSend { message: match item.message.clone() { None => MessageToListener::DisconnectSession { session: item.session.session.clone(), }, Some(msg) => MessageToListener::SendToSession { session: item.session.session.clone(), msg: msg, }, }, ack_notify: tx, }) .await .unwrap_or(()); rx.await.unwrap_or(()); pool.delete_from_sendqueue(&item).await?; } } } if q.len() <= 9 { break; } } Ok(()) } fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) { task::spawn(async move { loop { time::sleep(time::Duration::from_millis(500)).await; match process_sendqueue_once(pool.clone(), listener_map.clone()).await { Ok(()) => {} Err(e) => { warn!("Error processing sendqueue: {}", e); } } } }); } #[cfg(not(test))] async fn process_tasks_once(pool: db::DBPool) -> DResult<()> { loop { let tx = pool.start_transaction().await?; match tx.get_next_scheduled_task().await? { None => { break; } Some(task_parse) => { match task_parse { TaskParse::Known(mut task) => { match task_handler_registry().get(task.details.name()) { None => { warn!( "Found a known but unregistered task type: {}", task.details.name() ); // This is always a logic error, so just delete the task // to help with recovery. tx.delete_task(&task.details.name(), &task.meta.task_code) .await?; tx.commit().await?; } Some(handler) => { let mut ctx = TaskRunContext { trans: &tx, task: &mut task, }; match handler.do_task(&mut ctx).await { Err(e) => { if is_concurrency_error(e.as_ref()) { continue; } task.meta.consecutive_failure_count += 1; warn!("Error handling event of type {} code {} (consecutive count: {}): {:?}", &task.details.name(), &task.meta.task_code, task.meta.consecutive_failure_count, e); if task.meta.consecutive_failure_count > 3 && !task.meta.is_static { tx.delete_task( &task.details.name(), &task.meta.task_code, ) .await?; } else { task.meta.next_scheduled = Utc::now() + chrono::Duration::seconds(60); tx.update_task( &task.details.name(), &task.meta.task_code, &TaskParse::Known(task.clone()), ) .await?; } tx.commit().await?; } Ok(resched) => { task.meta.consecutive_failure_count = 0; match task.meta.recurrence.clone().or(resched.map(|r| { TaskRecurrence::FixedDuration { seconds: r.as_secs() as u32, } })) { None => { tx.delete_task( &task.details.name(), &task.meta.task_code, ) .await?; } Some(TaskRecurrence::FixedDuration { seconds }) => { task.meta.next_scheduled = Utc::now() + chrono::Duration::seconds(seconds as i64); tx.update_task( &task.details.name(), &task.meta.task_code, &TaskParse::Known(task.clone()), ) .await?; } } tx.commit().await?; } } } } } TaskParse::Unknown(mut task) => { warn!( "Found unknown task type: {}, code: {}", &task.task_type, &task.meta.task_code ); if task.meta.is_static { // Probably a new (or newly removed) static type. // We just skip this tick of it. match task.meta.recurrence { None => { tx.delete_task(&task.task_type, &task.meta.task_code) .await?; tx.commit().await?; } Some(TaskRecurrence::FixedDuration { seconds }) => { task.meta .next_scheduled .add_assign(chrono::Duration::seconds(seconds as i64)); tx.update_task( &task.task_type, &task.meta.task_code, &TaskParse::Unknown(task.clone()), ) .await?; } } } else { tx.delete_task(&task.task_type, &task.meta.task_code) .await?; tx.commit().await?; } } } } } } Ok(()) } #[cfg(test)] async fn process_tasks_once(_pool: db::DBPool) -> DResult<()> { task_handler_registry(); Ok(()) } fn start_task_runner(pool: db::DBPool) { task::spawn(async move { loop { time::sleep(time::Duration::from_millis(500)).await; match process_tasks_once(pool.clone()).await { Ok(()) => {} Err(e) => { warn!("Error processing tasks: {}", e); } } } }); } async fn send_version_once(listener_map: ListenerMap) -> DResult<()> { for listener_sender in listener_map.lock().await.values().cloned() { let (tx, rx) = oneshot::channel(); listener_sender .send(ListenerSend { message: MessageToListener::GameserverVersion { version: env!("GIT_VERSION").to_owned(), }, ack_notify: tx, }) .await .unwrap_or(()); rx.await.unwrap_or(()); } Ok(()) } fn start_version_send_task(listener_map: ListenerMap) { task::spawn(async move { loop { time::sleep(time::Duration::from_millis(30000)).await; match send_version_once(listener_map.clone()).await { Ok(()) => {} Err(e) => { warn!("Error processing tasks: {}", e); } } } }); } pub fn start_regular_tasks(pool: &db::DBPool, listener_map: ListenerMap) -> DResult<()> { start_session_cleanup_task(pool.clone()); start_send_queue_task(pool.clone(), listener_map.clone()); start_version_send_task(listener_map); start_task_runner(pool.clone()); Ok(()) }