Make commands run in DB transaction

This commit is contained in:
Condorra 2022-12-25 00:25:52 +11:00
parent 13b10c3fe7
commit 887b69340f
5 changed files with 114 additions and 17 deletions

60
Cargo.lock generated
View File

@ -2,6 +2,18 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]] [[package]]
name = "ansi_macro" name = "ansi_macro"
version = "0.1.0" version = "0.1.0"
@ -71,6 +83,7 @@ dependencies = [
"log", "log",
"nix", "nix",
"nom", "nom",
"ouroboros",
"phf", "phf",
"ring", "ring",
"serde", "serde",
@ -766,6 +779,29 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "ouroboros"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca"
dependencies = [
"aliasable",
"ouroboros_macro",
]
[[package]]
name = "ouroboros_macro"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -907,6 +943,30 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.47" version = "1.0.47"

View File

@ -28,3 +28,4 @@ serde_json = "1.0.91"
phf = { version = "0.11.1", features = ["macros"] } phf = { version = "0.11.1", features = ["macros"] }
async-trait = "0.1.60" async-trait = "0.1.60"
nom = "7.1.1" nom = "7.1.1"
ouroboros = "0.15.5"

View File

@ -1,20 +1,30 @@
use tokio_postgres::{config::Config as PgConfig, row::Row}; use tokio_postgres::{config::Config as PgConfig, row::Row};
use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, Transaction,
RecyclingMethod}; RecyclingMethod};
use std::error::Error; use std::error::Error;
use std::str::FromStr; use std::str::FromStr;
use ouroboros::self_referencing;
use uuid::Uuid; use uuid::Uuid;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use crate::message_handler::ListenerSession; use crate::message_handler::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::models::session::Session; use crate::models::session::Session;
use serde_json; use serde_json;
use futures::FutureExt;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DBPool { pub struct DBPool {
pool: Pool pool: Pool
} }
#[self_referencing]
pub struct DBTrans {
conn: Object,
#[borrows(mut conn)]
#[covariant]
pub trans: Option<Transaction<'this>>
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SendqueueItem { pub struct SendqueueItem {
pub item: i64, pub item: i64,
@ -90,6 +100,14 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn start_transaction(self: &Self) -> DResult<DBTrans> {
let conn = self.get_conn().await?;
Ok(DBTransAsyncSendTryBuilder {
conn,
trans_builder: |conn| Box::pin(conn.transaction().map(|r| r.map(Some)))
}.try_build().await?)
}
pub async fn queue_for_session(self: &Self, pub async fn queue_for_session(self: &Self,
session: &ListenerSession, session: &ListenerSession,
message: &str) -> DResult<()> { message: &str) -> DResult<()> {
@ -135,3 +153,26 @@ impl DBPool {
.map(|pool| Self { pool }) .map(|pool| Self { pool })
} }
} }
impl DBTrans {
pub async fn queue_for_session(self: &Self,
session: &ListenerSession,
message: &str) -> DResult<()> {
self.pg_trans()?
.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?;
Ok(())
}
pub async fn commit(mut self: Self) -> DResult<()> {
let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None));
for trans in trans_opt {
trans.commit().await?;
}
Ok(())
}
pub fn pg_trans(self: &Self) -> DResult<&Transaction> {
self.borrow_trans().as_ref().ok_or("Transaction already closed".into())
}
}

View File

@ -1,6 +1,6 @@
use super::ListenerSession; use super::ListenerSession;
use crate::DResult; use crate::DResult;
use crate::db::DBPool; use crate::db::{DBTrans, DBPool};
use ansi_macro::ansi; use ansi_macro::ansi;
use phf::phf_map; use phf::phf_map;
use async_trait::async_trait; use async_trait::async_trait;
@ -9,10 +9,9 @@ mod parsing;
mod ignore; mod ignore;
mod help; mod help;
#[derive(Debug)]
pub struct VerbContext<'l> { pub struct VerbContext<'l> {
session: &'l ListenerSession, session: &'l ListenerSession,
pool: &'l DBPool trans: &'l DBTrans
} }
pub enum CommandHandlingError { pub enum CommandHandlingError {
@ -48,31 +47,27 @@ static ALWAYS_AVAILABLE_COMMANDS: UserVerbRegistry = phf_map! {
pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> { pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> {
let (cmd, params) = parsing::parse_command_name(msg); let (cmd, params) = parsing::parse_command_name(msg);
let trans = pool.start_transaction().await?;
let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd); let handler_opt = ALWAYS_AVAILABLE_COMMANDS.get(cmd);
match handler_opt { match handler_opt {
None => { None => {
pool.queue_for_session(session, trans.queue_for_session(session,
ansi!( ansi!(
"That's not a command I know. Try <bold>help<reset>\r\n" "That's not a command I know. Try <bold>help<reset>\r\n"
) )
).await?; ).await?;
} }
Some(handler) => { Some(handler) => {
match handler.handle(&VerbContext { session, pool }, cmd, params).await { match handler.handle(&VerbContext { session, trans: &trans }, cmd, params).await {
Ok(()) => {} Ok(()) => {}
Err(UserError(err_msg)) => { Err(UserError(err_msg)) => {
pool.queue_for_session(session, &(err_msg + "\r\n")).await?; trans.queue_for_session(session, &(err_msg + "\r\n")).await?;
} }
Err(SystemError(e)) => Err(e)? Err(SystemError(e)) => Err(e)?
} }
} }
} }
/* trans.commit().await?;
pool.queue_for_session(session,
&format!(ansi!(
"You hear an echo saying: <bggreen><red>{}<reset>\r\n"
), msg)).await?;
*/
Ok(()) Ok(())
} }

View File

@ -25,7 +25,7 @@ impl UserVerb for Verb {
async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, remaining: &str) -> UResult<()> { async fn handle(self: &Self, ctx: &VerbContext, _verb: &str, remaining: &str) -> UResult<()> {
let help = HELP_PAGES.get(remaining).ok_or( let help = HELP_PAGES.get(remaining).ok_or(
UserError("No help available on that".to_string()))?; UserError("No help available on that".to_string()))?;
ctx.pool.queue_for_session(ctx.session, &(help.to_string() + "\r\n")).await?; ctx.trans.queue_for_session(ctx.session, &(help.to_string() + "\r\n")).await?;
Ok(()) Ok(())
} }
} }