use tokio::{task, time, sync::oneshot}; use crate::DResult; use crate::db; use crate::listener::{ListenerMap, ListenerSend}; use blastmud_interfaces::MessageToListener; use log::warn; async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { for listener in pool.clone().get_dead_listeners().await? { pool.clone().cleanup_listener(listener).await?; } Ok(()) } fn start_session_cleanup_task(pool: db::DBPool) { task::spawn(async move { loop { time::sleep(time::Duration::from_secs(60)).await; match cleanup_session_once(pool.clone()).await { Ok(()) => {} Err(e) => { warn!("Error cleaning up sessions: {}", e); } } } }); } async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { for item in pool.clone().get_from_sendqueue().await? { match listener_map.lock().await.get(&item.session.listener).map(|l| l.clone()) { None => {} Some(listener_sender) => { let (tx, rx) = oneshot::channel(); listener_sender.send( ListenerSend { message: MessageToListener::SendToSession { session: item.session.session.clone(), msg: item.message.clone() }, ack_notify: tx } ).await.unwrap_or(()); rx.await.unwrap_or(()); pool.clone().delete_from_sendqueue(&item).await?; } } } Ok(()) } fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) { task::spawn(async move { loop { time::sleep(time::Duration::from_secs(1)).await; match process_sendqueue_once(pool.clone(), listener_map.clone()).await { Ok(()) => {} Err(e) => { warn!("Error processing sendqueue: {}", e); } } } }); } pub fn start_regular_tasks(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { start_session_cleanup_task(pool.clone()); start_send_queue_task(pool, listener_map); Ok(()) }