diff --git a/Cargo.lock b/Cargo.lock index 28977abe..f11575ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" + +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "ansi_macro" version = "0.1.0" @@ -71,6 +83,7 @@ dependencies = [ "log", "nix", "nom", + "ouroboros", "phf", "ring", "serde", @@ -766,6 +779,29 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +[[package]] +name = "ouroboros" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca" +dependencies = [ + "aliasable", + "ouroboros_macro", +] + +[[package]] +name = "ouroboros_macro" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -907,6 +943,30 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.47" diff --git a/blastmud_game/Cargo.toml b/blastmud_game/Cargo.toml index 95c15d20..34687eab 100644 --- a/blastmud_game/Cargo.toml +++ b/blastmud_game/Cargo.toml @@ -28,3 +28,4 @@ serde_json = "1.0.91" phf = { version = "0.11.1", features = ["macros"] } async-trait = "0.1.60" nom = "7.1.1" +ouroboros = "0.15.5" diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index edd01af7..2b877bb7 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -1,20 +1,30 @@ use tokio_postgres::{config::Config as PgConfig, row::Row}; -use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, +use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, Transaction, RecyclingMethod}; use std::error::Error; use std::str::FromStr; +use ouroboros::self_referencing; use uuid::Uuid; use tokio_postgres::NoTls; use crate::message_handler::ListenerSession; use crate::DResult; use crate::models::session::Session; use serde_json; +use futures::FutureExt; #[derive(Clone, Debug)] pub struct DBPool { pool: Pool } +#[self_referencing] +pub struct DBTrans { + conn: Object, + #[borrows(mut conn)] + #[covariant] + pub trans: Option> +} + #[derive(Clone, Debug)] pub struct SendqueueItem { pub item: i64, @@ -90,6 +100,14 @@ impl DBPool { Ok(()) } + pub async fn start_transaction(self: &Self) -> DResult { + let conn = self.get_conn().await?; + Ok(DBTransAsyncSendTryBuilder { + conn, + trans_builder: |conn| Box::pin(conn.transaction().map(|r| r.map(Some))) + }.try_build().await?) + } + pub async fn queue_for_session(self: &Self, session: &ListenerSession, message: &str) -> DResult<()> { @@ -135,3 +153,26 @@ impl DBPool { .map(|pool| Self { pool }) } } + +impl DBTrans { + pub async fn queue_for_session(self: &Self, + session: &ListenerSession, + message: &str) -> DResult<()> { + self.pg_trans()? + .execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", + &[&session.session, &session.listener, &message]).await?; + Ok(()) + } + + pub async fn commit(mut self: Self) -> DResult<()> { + let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None)); + for trans in trans_opt { + trans.commit().await?; + } + Ok(()) + } + + pub fn pg_trans(self: &Self) -> DResult<&Transaction> { + self.borrow_trans().as_ref().ok_or("Transaction already closed".into()) + } +} diff --git a/blastmud_game/src/message_handler/user_commands.rs b/blastmud_game/src/message_handler/user_commands.rs index f175465f..dbbbc0dc 100644 --- a/blastmud_game/src/message_handler/user_commands.rs +++ b/blastmud_game/src/message_handler/user_commands.rs @@ -1,6 +1,6 @@ use super::ListenerSession; use crate::DResult; -use crate::db::DBPool; +use crate::db::{DBTrans, DBPool}; use ansi_macro::ansi; use phf::phf_map; use async_trait::async_trait; @@ -9,10 +9,9 @@ mod parsing; mod ignore; mod help; -#[derive(Debug)] pub struct VerbContext<'l> { session: &'l ListenerSession, - pool: &'l DBPool + trans: &'l DBTrans } pub enum CommandHandlingError { @@ -48,31 +47,27 @@ static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! { pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> { let (cmd, params) = parsing::parse_command_name(msg); + let trans = pool.start_transaction().await?; let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd); match handler_opt { None => { - pool.queue_for_session(session, - ansi!( - "That's not a command I know. Try help\r\n" - ) + trans.queue_for_session(session, + ansi!( + "That's not a command I know. Try help\r\n" + ) ).await?; } Some(handler) => { - match handler.handle(&VerbContext { session, pool }, cmd, params).await { + match handler.handle(&VerbContext { session, trans: &trans }, cmd, params).await { Ok(()) => {} Err(UserError(err_msg)) => { - pool.queue_for_session(session, &(err_msg + "\r\n")).await?; + trans.queue_for_session(session, &(err_msg + "\r\n")).await?; } Err(SystemError(e)) => Err(e)? } } } - /* - pool.queue_for_session(session, - &format!(ansi!( - "You hear an echo saying: {}\r\n" - ), msg)).await?; - */ + trans.commit().await?; Ok(()) } diff --git a/blastmud_game/src/message_handler/user_commands/help.rs b/blastmud_game/src/message_handler/user_commands/help.rs index eae2485e..ae77de07 100644 --- a/blastmud_game/src/message_handler/user_commands/help.rs +++ b/blastmud_game/src/message_handler/user_commands/help.rs @@ -25,7 +25,7 @@ impl UserVerb for Verb { async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, remaining: &str) -> UResult<()> { let help = HELP_PAGES.get(remaining).ok_or( UserError("No help available on that".to_string()))?; - ctx.pool.queue_for_session(ctx.session, &(help.to_string() + "\r\n")).await?; + ctx.trans.queue_for_session(ctx.session, &(help.to_string() + "\r\n")).await?; Ok(()) } }