diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index 8861c6c8..b0ec0ada 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -12,56 +12,66 @@ pub struct DBPool { pool: Pool } -pub async fn record_listener_ping(listener: Uuid, pool: DBPool) -> DResult<()> { - get_conn(pool).await?.execute( - "INSERT INTO listeners (listener, last_seen) \ - VALUES ($1, NOW()) \ - ON CONFLICT (listener) \ - DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener]).await?; - Ok(()) -} +impl DBPool { + pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> { + self.get_conn().await?.execute( + "INSERT INTO listeners (listener, last_seen) \ + VALUES ($1, NOW()) \ + ON CONFLICT (listener) \ + DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener]).await?; + Ok(()) + } -pub async fn get_dead_listeners(pool: DBPool) -> DResult> { - Ok(get_conn(pool).await? - .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ - INTERVAL '2 minutes'", &[]) - .await?.into_iter().map(|r| r.get(0)).collect()) -} + pub async fn get_dead_listeners(self: DBPool) -> DResult> { + Ok(self.get_conn().await? + .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ + INTERVAL '2 minutes'", &[]) + .await?.into_iter().map(|r| r.get(0)).collect()) + } -pub async fn cleanup_listener(pool: DBPool, listener: Uuid) -> DResult<()> { - let mut conn = get_conn(pool).await?; - let tx = conn.transaction().await?; - tx.execute("UPDATE users SET current_session = NULL, \ - current_listener = NULL WHERE current_listener = $1", - &[&listener]).await?; - tx.execute("DELETE FROM sendqueue WHERE listener = $1", - &[&listener]).await?; - tx.execute("DELETE FROM sessions WHERE listener = $1", - &[&listener]).await?; - tx.execute("DELETE FROM listeners WHERE listener = $1", - &[&listener]).await?; - tx.commit().await?; - Ok(()) -} + pub async fn cleanup_listener(self: DBPool, listener: Uuid) -> DResult<()> { + let mut conn = self.get_conn().await?; + let tx = conn.transaction().await?; + tx.execute("UPDATE users SET current_session = NULL, \ + current_listener = NULL WHERE current_listener = $1", + &[&listener]).await?; + tx.execute("DELETE FROM sendqueue WHERE listener = $1", + &[&listener]).await?; + tx.execute("DELETE FROM sessions WHERE listener = $1", + &[&listener]).await?; + tx.execute("DELETE FROM listeners WHERE listener = $1", + &[&listener]).await?; + tx.commit().await?; + Ok(()) + } -pub async fn get_conn(DBPool { pool }: DBPool) -> - DResult { - let conn = pool.get().await?; - conn.execute("SET synchronous_commit=off", &[]).await?; - Ok(conn) -} - -pub fn start_pool(connstr: &str) -> DResult { - let mgr_config = ManagerConfig { - recycling_method: RecyclingMethod::Fast - }; - let mgr = Manager::from_config( - PgConfig::from_str(connstr) - .map_err(|e| Box::new(e) as Box)?, - NoTls, mgr_config - ); - - Pool::builder(mgr).max_size(4).build() - .map_err(|e| Box::new(e) as Box) - .map(|pool| DBPool { pool }) + pub async fn start_session(self: DBPool, listener: Uuid, session: Uuid) -> DResult<()> { + self.get_conn().await?.execute( + "INSERT INTO sessions (session, listener, details) VALUES ($1, $2, '{}')", + &[&session, &listener] + ).await?; + Ok(()) + } + + pub async fn get_conn(self: DBPool) -> + DResult { + let conn = self.pool.get().await?; + conn.execute("SET synchronous_commit=off", &[]).await?; + Ok(conn) + } + + pub fn start(connstr: &str) -> DResult { + let mgr_config = ManagerConfig { + recycling_method: RecyclingMethod::Fast + }; + let mgr = Manager::from_config( + PgConfig::from_str(connstr) + .map_err(|e| Box::new(e) as Box)?, + NoTls, mgr_config + ); + + Pool::builder(mgr).max_size(4).build() + .map_err(|e| Box::new(e) as Box) + .map(|pool| DBPool { pool }) + } } diff --git a/blastmud_game/src/main.rs b/blastmud_game/src/main.rs index 82665731..008d5d91 100644 --- a/blastmud_game/src/main.rs +++ b/blastmud_game/src/main.rs @@ -4,6 +4,7 @@ use std::error::Error; use log::{info, error, LevelFilter}; use simple_logger::SimpleLogger; use tokio::signal::unix::{signal, SignalKind}; +use db::DBPool; mod db; mod listener; @@ -35,10 +36,10 @@ async fn main() -> DResult<()> { Err(e) })?; let config = read_latest_config()?; - let pool = db::start_pool(&config.database_conn_string)?; + let pool = DBPool::start(&config.database_conn_string)?; // Test the database connection string works so we quit early if not... - let _ = db::get_conn(pool.clone()).await?.query("SELECT 1", &[]).await?; + let _ = pool.clone().get_conn().await?.query("SELECT 1", &[]).await?; info!("Database pool initialised"); diff --git a/blastmud_game/src/message_handler.rs b/blastmud_game/src/message_handler.rs index 38f34fce..aad2254a 100644 --- a/blastmud_game/src/message_handler.rs +++ b/blastmud_game/src/message_handler.rs @@ -11,8 +11,10 @@ use log::info; pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { match msg { - ListenerPing { .. } => { db::record_listener_ping(listener, pool).await?; } - SessionConnected { session: _, source: _ } => {} + ListenerPing { .. } => { pool.record_listener_ping(listener).await?; } + SessionConnected { session, source: _ } => { + pool.start_session(listener, session).await?; + } SessionDisconnected { session: _ } => {} SessionSentLine { session, msg } => { info!("Awaiting listener lock"); diff --git a/blastmud_game/src/regular_tasks.rs b/blastmud_game/src/regular_tasks.rs index 8803204f..79fbf13a 100644 --- a/blastmud_game/src/regular_tasks.rs +++ b/blastmud_game/src/regular_tasks.rs @@ -4,8 +4,8 @@ use crate::db; use log::warn; async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { - for listener in db::get_dead_listeners(pool.clone()).await? { - db::cleanup_listener(pool.clone(), listener).await?; + for listener in pool.clone().get_dead_listeners().await? { + pool.clone().cleanup_listener(listener).await?; } Ok(()) }