From 8aa4d59ba6c729adef43194f04dacb47def0c0fe Mon Sep 17 00:00:00 2001 From: Shagnor Date: Fri, 23 Dec 2022 23:31:49 +1100 Subject: [PATCH] Implement DB based sendqueue --- blastmud_game/src/db.rs | 71 +++++++++++++++++++++++++--- blastmud_game/src/listener.rs | 2 +- blastmud_game/src/main.rs | 4 +- blastmud_game/src/message_handler.rs | 43 +++++------------ blastmud_game/src/regular_tasks.rs | 50 ++++++++++++++++++-- schema/schema.sql | 2 +- 6 files changed, 126 insertions(+), 46 deletions(-) diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index b0ec0ad..ca7d411 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -1,10 +1,11 @@ -use tokio_postgres::config::Config as PgConfig; +use tokio_postgres::{config::Config as PgConfig, row::Row}; use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, RecyclingMethod}; use std::error::Error; use std::str::FromStr; use uuid::Uuid; use tokio_postgres::NoTls; +use crate::message_handler::ListenerSession; use crate::DResult; #[derive(Clone, Debug)] @@ -12,6 +13,25 @@ pub struct DBPool { pool: Pool } +#[derive(Clone, Debug)] +pub struct SendqueueItem { + pub item: i64, + pub session: ListenerSession, + pub message: String +} +impl From for SendqueueItem { + fn from(row: Row) -> Self { + SendqueueItem { + item: row.get("item"), + session: ListenerSession { + session: row.get("session"), + listener: row.get("listener") + }, + message: row.get("message") + } + } +} + impl DBPool { pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> { self.get_conn().await?.execute( @@ -22,14 +42,14 @@ impl DBPool { Ok(()) } - pub async fn get_dead_listeners(self: DBPool) -> DResult> { + pub async fn get_dead_listeners(self: Self) -> DResult> { Ok(self.get_conn().await? .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ INTERVAL '2 minutes'", &[]) .await?.into_iter().map(|r| r.get(0)).collect()) } - pub async fn cleanup_listener(self: DBPool, listener: Uuid) -> DResult<()> { + pub async fn cleanup_listener(self: Self, listener: Uuid) -> DResult<()> { let mut conn = self.get_conn().await?; let tx = conn.transaction().await?; tx.execute("UPDATE users SET current_session = NULL, \ @@ -45,13 +65,50 @@ impl DBPool { Ok(()) } - pub async fn start_session(self: DBPool, listener: Uuid, session: Uuid) -> DResult<()> { + pub async fn start_session(self: Self, session: ListenerSession) -> DResult<()> { self.get_conn().await?.execute( "INSERT INTO sessions (session, listener, details) VALUES ($1, $2, '{}')", - &[&session, &listener] + &[&session.session, &session.listener] ).await?; Ok(()) } + + pub async fn end_session(self: Self, session: ListenerSession) -> DResult<()> { + let mut conn = self.get_conn().await?; + let tx = conn.transaction().await?; + tx.execute("UPDATE users SET current_session = NULL, \ + current_listener = NULL WHERE current_session = $1", + &[&session.session]).await?; + tx.execute("DELETE FROM sendqueue WHERE session = $1", + &[&session.session]).await?; + tx.execute("DELETE FROM sessions WHERE session = $1", + &[&session.session]).await?; + tx.commit().await?; + Ok(()) + } + + pub async fn queue_for_session(self: Self, + session: &ListenerSession, + message: &str) -> DResult<()> { + let conn = self.get_conn().await?; + conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", + &[&session.session, &session.listener, &message]).await?; + Ok(()) + } + + pub async fn get_from_sendqueue(self: Self) -> DResult> { + let conn = self.get_conn().await?; + Ok(conn.query("SELECT item, session, listener, message FROM sendqueue ORDER BY item ASC LIMIT 10", + &[]) + .await?.into_iter().map(SendqueueItem::from).collect()) + + } + + pub async fn delete_from_sendqueue(self: DBPool, item: &SendqueueItem) -> DResult<()> { + let conn = self.get_conn().await?; + conn.execute("DELETE FROM sendqueue WHERE item=$1", &[&item.item]).await?; + Ok(()) + } pub async fn get_conn(self: DBPool) -> DResult { @@ -59,7 +116,7 @@ impl DBPool { conn.execute("SET synchronous_commit=off", &[]).await?; Ok(conn) } - + pub fn start(connstr: &str) -> DResult { let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast @@ -72,6 +129,6 @@ impl DBPool { Pool::builder(mgr).max_size(4).build() .map_err(|e| Box::new(e) as Box) - .map(|pool| DBPool { pool }) + .map(|pool| Self { pool }) } } diff --git a/blastmud_game/src/listener.rs b/blastmud_game/src/listener.rs index 0c9b3fb..64ed699 100644 --- a/blastmud_game/src/listener.rs +++ b/blastmud_game/src/listener.rs @@ -19,7 +19,7 @@ pub struct ListenerSend { pub ack_notify: oneshot::Sender<()> } pub type ListenerMap = Arc>>>; - + async fn handle_from_listener( conn: TcpStream, message_handler: FHandler, diff --git a/blastmud_game/src/main.rs b/blastmud_game/src/main.rs index 008d5d9..7658181 100644 --- a/blastmud_game/src/main.rs +++ b/blastmud_game/src/main.rs @@ -48,12 +48,12 @@ async fn main() -> DResult<()> { let mh_pool = pool.clone(); listener::start_listener(config.listener, listener_map.clone(), move |listener_id, msg| { - message_handler::handle(listener_id, msg, mh_pool.clone(), listener_map.clone()) + message_handler::handle(listener_id, msg, mh_pool.clone()) } ).await?; version_cutover::replace_old_gameserver(&config.pidfile)?; - regular_tasks::start_regular_tasks(pool.clone())?; + regular_tasks::start_regular_tasks(pool.clone(), listener_map)?; let mut sigusr1 = signal(SignalKind::user_defined1())?; sigusr1.recv().await; diff --git a/blastmud_game/src/message_handler.rs b/blastmud_game/src/message_handler.rs index aad2254..6b949d8 100644 --- a/blastmud_game/src/message_handler.rs +++ b/blastmud_game/src/message_handler.rs @@ -1,45 +1,28 @@ use blastmud_interfaces::*; -use crate::listener::ListenerMap; use crate::db; use MessageFromListener::*; use uuid::Uuid; -use tokio::{sync::oneshot, task}; -use crate::listener::ListenerSend; use crate::DResult; -use log::info; -pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap) +#[derive(Clone,Debug)] +pub struct ListenerSession { + pub listener: Uuid, + pub session: Uuid +} + +pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool) -> DResult<()> { match msg { ListenerPing { .. } => { pool.record_listener_ping(listener).await?; } SessionConnected { session, source: _ } => { - pool.start_session(listener, session).await?; + pool.start_session(ListenerSession { listener, session }).await?; + } + SessionDisconnected { session } => { + pool.end_session(ListenerSession { listener, session }).await?; } - SessionDisconnected { session: _ } => {} SessionSentLine { session, msg } => { - info!("Awaiting listener lock"); - let lmlock = listener_map.lock().await; - let opt_sender = lmlock.get(&listener).map(|v| v.clone()); - drop(lmlock); - info!("Listener lock dropped"); - match opt_sender { - None => {} - Some(sender) => { - info!("Spawning message task"); - task::spawn(async move { - let (tx, rx) = oneshot::channel(); - info!("Sending echo"); - sender.send(ListenerSend { message: MessageToListener::SendToSession { - session, - msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) }, - ack_notify: tx }).await.unwrap_or(()); - info!("Awaiting echo ack"); - rx.await.unwrap_or(()); - info!("Echo ack received"); - }); - info!("Message task spawned"); - } - } + pool.queue_for_session(&ListenerSession { listener, session }, + &format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg)).await?; } AcknowledgeMessage => {} } diff --git a/blastmud_game/src/regular_tasks.rs b/blastmud_game/src/regular_tasks.rs index 79fbf13..966cd04 100644 --- a/blastmud_game/src/regular_tasks.rs +++ b/blastmud_game/src/regular_tasks.rs @@ -1,6 +1,8 @@ -use tokio::{task, time}; +use tokio::{task, time, sync::oneshot}; use crate::DResult; use crate::db; +use crate::listener::{ListenerMap, ListenerSend}; +use blastmud_interfaces::MessageToListener; use log::warn; async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { @@ -10,21 +12,59 @@ async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { Ok(()) } -fn start_session_cleanup_task(pool: db::DBPool) -> DResult<()> { +fn start_session_cleanup_task(pool: db::DBPool) { task::spawn(async move { loop { + time::sleep(time::Duration::from_secs(60)).await; match cleanup_session_once(pool.clone()).await { Ok(()) => {} Err(e) => { warn!("Error cleaning up sessions: {}", e); - time::sleep(time::Duration::from_secs(1)).await; } } } }); +} + +async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { + for item in pool.clone().get_from_sendqueue().await? { + match listener_map.lock().await.get(&item.session.listener).map(|l| l.clone()) { + None => {} + Some(listener_sender) => { + let (tx, rx) = oneshot::channel(); + listener_sender.send( + ListenerSend { + message: MessageToListener::SendToSession { + session: item.session.session.clone(), + msg: item.message.clone() + }, + ack_notify: tx + } + ).await.unwrap_or(()); + rx.await.unwrap_or(()); + pool.clone().delete_from_sendqueue(&item).await?; + } + } + } Ok(()) } -pub fn start_regular_tasks(pool: db::DBPool) -> DResult<()> { - start_session_cleanup_task(pool) +fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) { + task::spawn(async move { + loop { + time::sleep(time::Duration::from_secs(1)).await; + match process_sendqueue_once(pool.clone(), listener_map.clone()).await { + Ok(()) => {} + Err(e) => { + warn!("Error processing sendqueue: {}", e); + } + } + } + }); +} + +pub fn start_regular_tasks(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { + start_session_cleanup_task(pool.clone()); + start_send_queue_task(pool, listener_map); + Ok(()) } diff --git a/schema/schema.sql b/schema/schema.sql index 391d3f9..98281f3 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -37,7 +37,7 @@ CREATE TABLE users ( CREATE INDEX user_by_listener ON users(current_listener); CREATE UNLOGGED TABLE sendqueue ( - item BIGINT NOT NULL PRIMARY KEY, + item BIGSERIAL NOT NULL PRIMARY KEY, session UUID NOT NULL REFERENCES sessions(session), listener UUID REFERENCES listeners(listener), message TEXT NOT NULL