use crate::message_handler::user_commands::parsing::parse_offset; use crate::message_handler::ListenerSession; use crate::models::{ consent::{Consent, ConsentType}, corp::{Corp, CorpCommType, CorpId, CorpMembership}, item::{Item, ItemFlag, LocationActionType}, session::Session, task::{Task, TaskParse}, user::User, }; use crate::static_content::{possession_type::PossessionType, room::Direction}; use crate::DResult; use chrono::{DateTime, Utc}; use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, RecyclingMethod, Transaction}; #[cfg(test)] use mockall::automock; use ouroboros::self_referencing; use serde::{Deserialize, Serialize}; use serde_json::{self, Value}; use std::collections::BTreeSet; use std::error::Error; use std::str::FromStr; use std::sync::Arc; #[cfg(not(test))] use tokio_postgres::error::{DbError, SqlState}; use tokio_postgres::types::ToSql; use tokio_postgres::NoTls; use tokio_postgres::{config::Config as PgConfig, row::Row}; use uuid::Uuid; #[derive(Clone, Debug)] pub struct DBPool { pool: Pool, } #[self_referencing] pub struct DBTrans { conn: Object, #[borrows(mut conn)] #[covariant] pub trans: Option>, } #[derive(Clone, Debug)] pub struct SendqueueItem { pub item: i64, pub session: ListenerSession, pub message: Option, } impl From for SendqueueItem { fn from(row: Row) -> Self { SendqueueItem { item: row.get("item"), session: ListenerSession { session: row.get("session"), listener: row.get("listener"), }, message: row.get("message"), } } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OnlineInfo { pub username: String, pub corp: Option, pub time: Option>, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LocationStats { pub total_count: u64, pub total_weight: u64, } #[cfg_attr(test, allow(dead_code))] impl DBPool { pub async fn record_listener_ping(self: &DBPool, listener: Uuid) -> DResult<()> { self.get_conn() .await? .execute( "INSERT INTO listeners (listener, last_seen) \ VALUES ($1, NOW()) \ ON CONFLICT (listener) \ DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener], ) .await?; Ok(()) } pub async fn get_dead_listeners(self: &Self) -> DResult> { Ok(self .get_conn() .await? .query( "SELECT listener FROM listeners WHERE last_seen < NOW() - \ INTERVAL '2 minutes'", &[], ) .await? .into_iter() .map(|r| r.get(0)) .collect()) } pub async fn cleanup_listener(self: &Self, listener: Uuid) -> DResult<()> { let mut conn = self.get_conn().await?; let tx = conn.transaction().await?; tx.execute( "UPDATE users SET current_session = NULL, \ current_listener = NULL WHERE current_listener = $1", &[&listener], ) .await?; tx.execute("DELETE FROM sendqueue WHERE listener = $1", &[&listener]) .await?; tx.execute("DELETE FROM sessions WHERE listener = $1", &[&listener]) .await?; tx.execute("DELETE FROM listeners WHERE listener = $1", &[&listener]) .await?; tx.commit().await?; Ok(()) } pub async fn start_session( self: &Self, session: &ListenerSession, details: &Session, ) -> DResult<()> { self.get_conn() .await? .execute( "INSERT INTO sessions (session, listener, details) \ VALUES ($1, $2, $3) ON CONFLICT (session) DO NOTHING", &[ &session.session, &session.listener, &serde_json::to_value(details)?, ], ) .await?; Ok(()) } pub async fn end_session(self: &Self, session: ListenerSession) -> DResult<()> { let mut conn = self.get_conn().await?; let tx = conn.transaction().await?; tx.execute( "UPDATE users SET current_session = NULL, \ current_listener = NULL WHERE current_session = $1", &[&session.session], ) .await?; tx.execute( "DELETE FROM sendqueue WHERE session = $1", &[&session.session], ) .await?; tx.execute( "DELETE FROM sessions WHERE session = $1", &[&session.session], ) .await?; tx.commit().await?; Ok(()) } pub async fn start_transaction(self: &Self) -> DResult { let conn = self.get_conn().await?; Ok(DBTransAsyncSendTryBuilder { conn, trans_builder: |conn| { Box::pin(async { match conn.transaction().await { Err(e) => Err(e), Ok(t) => { t.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE", &[]) .await?; Ok(Some(t)) } } }) }, } .try_build() .await?) } pub async fn queue_for_session( self: &Self, session: &ListenerSession, message: Option<&str>, ) -> DResult<()> { let conn = self.get_conn().await?; conn.execute( "INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", &[&session.session, &session.listener, &message], ) .await?; Ok(()) } pub async fn get_from_sendqueue(self: &Self) -> DResult> { let conn = self.get_conn().await?; Ok(conn .query( "SELECT item, session, listener, message FROM sendqueue WHERE sent_at IS NULL ORDER BY item ASC LIMIT 10", &[], ) .await? .into_iter() .map(SendqueueItem::from) .collect()) } pub async fn delete_from_sendqueue(self: &DBPool, item: &SendqueueItem) -> DResult<()> { let conn = self.get_conn().await?; conn.execute( "UPDATE sendqueue SET sent_at = NOW() WHERE item=$1", &[&item.item], ) .await?; conn.execute("DELETE FROM sendqueue WHERE item IN (\ WITH item_rows AS (\ SELECT item, row_number() OVER (ORDER BY sent_at DESC) AS rn FROM sendqueue WHERE sent_at IS NOT NULL AND \ session = $1 AND listener = $2) \ SELECT item FROM item_rows WHERE rn > 80 \ )", &[&item.session.session, &item.session.listener]) .await?; Ok(()) } pub async fn find_static_item_types(self: &Self) -> DResult>> { Ok(Box::new( self.get_conn() .await? .query( "SELECT DISTINCT details->>'item_type' AS item_type \ FROM items WHERE details->>'is_static' = 'true'", &[], ) .await? .iter() .map(|r| r.get("item_type")) .collect(), )) } pub async fn find_static_task_types(self: &Self) -> DResult>> { Ok(Box::new( self.get_conn() .await? .query( "SELECT DISTINCT details->>'task_type' AS task_type \ FROM tasks WHERE details->>'is_static' = 'true'", &[], ) .await? .iter() .map(|r| r.get("task_type")) .collect(), )) } pub async fn delete_static_items_by_type(self: &Self, item_type: &str) -> DResult<()> { self.get_conn().await?.query( "DELETE FROM items WHERE details->>'is_static' = 'true' AND details->>'item_type' = $1", &[&item_type]).await?; Ok(()) } pub async fn delete_static_tasks_by_type(self: &Self, task_type: &str) -> DResult<()> { self.get_conn().await?.query( "DELETE FROM tasks WHERE details->>'is_static' = 'true' AND details->>'task_type' = $1", &[&task_type]).await?; Ok(()) } pub async fn bump_session_time(&self, session: &ListenerSession) -> DResult<()> { self.get_conn().await?.query( "UPDATE sessions SET details=JSONB_SET(details, '{last_active}', to_json(NOW())::jsonb) \ WHERE session = $1", &[&session.session] ).await?; Ok(()) } pub async fn get_conn(self: &DBPool) -> DResult { let conn = self.pool.get().await?; conn.execute("SET synchronous_commit=off", &[]).await?; Ok(conn) } pub fn start(connstr: &str) -> DResult { let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast, }; let mgr = Manager::from_config( PgConfig::from_str(connstr).map_err(|e| Box::new(e) as Box)?, NoTls, mgr_config, ); Pool::builder(mgr) .max_size(4) .build() .map_err(|e| Box::new(e) as Box) .map(|pool| Self { pool }) } } #[derive(Clone, Debug)] pub struct ItemSearchParams<'l> { pub from_item: &'l Item, pub query: &'l str, pub include_contents: bool, pub include_loc_contents: bool, pub include_active_players: bool, pub include_all_players: bool, pub item_type_only: Option<&'l str>, pub item_action_type_only: Option<&'l LocationActionType>, pub flagged_only: Option, pub limit: u8, pub dead_first: bool, } impl ItemSearchParams<'_> { pub fn base<'l>(from_item: &'l Item, query: &'l str) -> ItemSearchParams<'l> { ItemSearchParams { from_item, query, include_contents: false, include_loc_contents: false, include_active_players: false, include_all_players: false, dead_first: false, limit: 100, item_type_only: None, item_action_type_only: None, flagged_only: None, } } } #[cfg_attr(test, automock)] #[cfg_attr(test, allow(dead_code))] impl DBTrans { pub async fn queue_for_session<'a>( self: &'a Self, session: &'a ListenerSession, message: Option<&'a str>, ) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)", &[&session.session, &session.listener, &message], ) .await?; Ok(()) } pub async fn get_session_user_model<'a>( self: &'a Self, session: &'a ListenerSession, ) -> DResult)>> { match self .pg_trans()? .query_opt( "SELECT s.details AS sess_details, \ u.details AS user_details FROM sessions s \ LEFT JOIN users u ON u.current_session = s.session \ WHERE s.session = $1", &[&session.session], ) .await? { None => Ok(None), Some(row) => Ok(Some(( serde_json::from_value(row.get("sess_details"))?, match row.get::<&str, Option>("user_details") { None => None, Some(v) => serde_json::from_value(v)?, }, ))), } } pub async fn save_session_model<'a>( self: &'a Self, session: &'a ListenerSession, details: &Session, ) -> DResult<()> { self.pg_trans()? .execute( "UPDATE sessions SET details = $1 WHERE session = $2", &[&serde_json::to_value(details)?, &session.session], ) .await?; Ok(()) } pub async fn find_by_username<'a>(self: &'a Self, username: &'a str) -> DResult> { if let Some(details_json) = self .pg_trans()? .query_opt( "SELECT details FROM users WHERE username=$1", &[&username.to_lowercase()], ) .await? { return Ok(Some(serde_json::from_value(details_json.get("details"))?)); } Ok(None) } pub async fn create_item<'a>(self: &'a Self, item: &'a Item) -> DResult { Ok(self .pg_trans()? .query_one( "INSERT INTO items (details) VALUES ($1) RETURNING item_id", &[&serde_json::to_value(item)?], ) .await? .get("item_id")) } pub async fn find_exact_dyn_exit<'a>( self: &'a Self, source: &'a str, direction: &'a Direction, ) -> DResult> { if let Some(details_json) = self .pg_trans()? .query_opt( "SELECT details FROM items WHERE \ details->'dynamic_entrance'->>'source_item' = $1 AND \ LOWER(details->'dynamic_entrance'->>'direction') = $2", &[&source, &direction.describe().to_lowercase()], ) .await? { return Ok(Some(serde_json::from_value(details_json.get("details"))?)); } Ok(None) } pub async fn limited_update_static_item<'a>(self: &'a Self, item: &'a Item) -> DResult<()> { let value = serde_json::to_value(item)?; let obj_map = value.as_object().expect("Static item to be object in JSON"); let mut params: Vec<&(dyn ToSql + Sync)> = vec![&item.item_type, &item.item_code]; let mut det_ex: String = "details".to_owned(); let mut var_id = 3; // Only copy more permanent fields, others are supposed to change over time and shouldn't // be reset on restart. for to_copy in [ "display", "display_less_explicit", "details", "details_less_explicit", "total_xp", "total_stats", "total_skills", "pronouns", "flags", "sex", "is_challenge_attack_only", "aliases", "species", ] { det_ex = format!("jsonb_set({}, '{{{}}}', ${})", det_ex, to_copy, var_id); params.push(obj_map.get(to_copy).unwrap_or(&Value::Null)); var_id += 1; } self.pg_trans()? .execute( &("UPDATE items SET details = ".to_owned() + &det_ex + " WHERE details->>'item_type' = $1 AND details->>'item_code' = $2"), ¶ms, ) .await?; Ok(()) } pub async fn limited_update_static_task<'a>(self: &'a Self, task: &'a Task) -> DResult<()> { let value = serde_json::to_value(task)?; let obj_map = value.as_object().expect("Static task to be object in JSON"); let task_name: &(dyn ToSql + Sync) = &task.details.name(); let mut params: Vec<&(dyn ToSql + Sync)> = vec![task_name, &task.meta.task_code]; let mut det_ex: String = "details".to_owned(); let mut var_id = 3; // Only copy more permanent fields, others are supposed to change over time and shouldn't // be reset on restart. We do reset failure count since the problem may be fixed. for to_copy in ["recurrence", "consecutive_failure_count", "task_details"] { det_ex = format!("jsonb_set({}, '{{{}}}', ${})", det_ex, to_copy, var_id); params.push(obj_map.get(to_copy).unwrap_or(&Value::Null)); var_id += 1; } self.pg_trans()? .execute( &("UPDATE tasks SET details = ".to_owned() + &det_ex + " WHERE details->>'task_type' = $1 AND details->>'task_code' = $2"), ¶ms, ) .await?; Ok(()) } pub async fn create_user<'a>( self: &'a Self, session: &'a ListenerSession, user_dat: &'a User, ) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO users (\ username, current_session, current_listener, details\ ) VALUES ($1, $2, $3, $4)", &[ &user_dat.username.to_lowercase(), &session.session, &session.listener, &serde_json::to_value(user_dat)?, ], ) .await?; Ok(()) } pub async fn find_dynzone_for_user(&self, username: &str) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE details->>'owner' = $1 \ AND details->>'item_type' = 'dynzone'", &[&format!("player/{}", username.to_lowercase())], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn delete_user(&self, username: &str) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM users WHERE username = $1", &[&username.to_lowercase()], ) .await?; Ok(()) } pub async fn save_user_model<'a>(self: &'a Self, details: &'a User) -> DResult<()> { self.pg_trans()? .execute( "UPDATE users SET details = $1 WHERE username = $2", &[ &serde_json::to_value(details)?, &details.username.to_lowercase(), ], ) .await?; Ok(()) } pub async fn attach_user_to_session<'a>( self: &'a Self, username: &'a str, session: &'a ListenerSession, ) -> DResult<()> { let username_l = username.to_lowercase(); self.pg_trans()? .execute( "INSERT INTO sendqueue (session, listener, message) \ SELECT current_session, current_listener, $1 FROM users \ WHERE username = $2 AND current_session IS NOT NULL \ AND current_listener IS NOT NULL", &[&"Logged in from another session\r\n", &username_l], ) .await?; self.pg_trans()? .execute( "INSERT INTO sendqueue (session, listener, message) \ SELECT current_session, current_listener, null FROM users \ WHERE username = $1 AND current_session IS NOT NULL \ AND current_listener IS NOT NULL", &[&username_l], ) .await?; self.pg_trans()? .execute( "UPDATE users SET current_session = $1, current_listener = $2 WHERE username = $3", &[ &session.session as &(dyn ToSql + Sync), &session.listener, &username_l, ], ) .await?; Ok(()) } pub async fn find_static_items_by_type<'a>( self: &'a Self, item_type: &'a str, ) -> DResult>> { Ok(Box::new( self.pg_trans()? .query( "SELECT DISTINCT details->>'item_code' AS item_code FROM items WHERE \ details->>'is_static' = 'true' AND \ details->>'item_type' = $1", &[&item_type], ) .await? .into_iter() .map(|v| v.get("item_code")) .collect(), )) } pub async fn find_static_tasks_by_type<'a>( self: &'a Self, task_type: &'a str, ) -> DResult>> { Ok(Box::new( self.pg_trans()? .query( "SELECT DISTINCT details->>'task_code' AS task_code FROM tasks WHERE \ details->>'is_static' = 'true' AND \ details->>'task_type' = $1", &[&task_type], ) .await? .into_iter() .map(|v| v.get("task_code")) .collect(), )) } pub async fn delete_static_items_by_code<'a>( self: &'a Self, item_type: &'a str, item_code: &str, ) -> DResult<()> { self.pg_trans()? .query( "DELETE FROM items WHERE details->>'is_static' = 'true' AND \ details->>'item_type' = $1 AND \ details->>'item_code' = $2", &[&item_type, &item_code], ) .await?; Ok(()) } pub async fn delete_static_tasks_by_code<'a>( self: &'a Self, task_type: &'a str, task_code: &'a str, ) -> DResult<()> { self.pg_trans()? .query( "DELETE FROM tasks WHERE details->>'is_static' = 'true' AND \ details->>'task_type' = $1 AND \ details->>'task_code' = $2", &[&task_type, &task_code], ) .await?; Ok(()) } pub async fn find_item_by_type_code<'a>( self: &'a Self, item_type: &'a str, item_code: &'a str, ) -> DResult>> { if let Some(item) = self .pg_trans()? .query_opt( "SELECT details FROM items WHERE \ details->>'item_type' = $1 AND \ details->>'item_code' = $2", &[&item_type, &item_code], ) .await? { return Ok(Some(Arc::new(serde_json::from_value::( item.get("details"), )?))); } Ok(None) } pub async fn transfer_all_possessions_code<'a>( self: &'a Self, src_loc: &'a str, dst_loc: &'a str, ) -> DResult<()> { self.pg_trans()? .execute( "UPDATE items SET details=JSONB_SET(details, '{location}', $1) \ WHERE details->>'location' = $2", &[&serde_json::to_value(dst_loc)?, &src_loc], ) .await?; Ok(()) } pub async fn transfer_all_possessions<'a>( self: &'a Self, source: &'a Item, dest: &'a Item, ) -> DResult<()> { let src_loc = format!("{}/{}", &source.item_type, &source.item_code); let dst_loc = format!("{}/{}", &dest.item_type, &dest.item_code); self.transfer_all_possessions_code(&src_loc, &dst_loc) .await?; Ok(()) } pub async fn find_items_by_location<'a>( self: &'a Self, location: &'a str, ) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE details->>'location' = $1 \ ORDER BY details->>'display' LIMIT 100", &[&location], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn find_items_by_location_possession_type_excluding<'a>( self: &'a Self, location: &'a str, possession_type: &'a PossessionType, exclude_codes: &'a Vec<&'a str>, ) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE details->>'location' = $1 AND \ details->'possession_type' = $2 AND NOT \ ($3::JSONB @> (details->'item_code')) \ ORDER BY details->>'display' \ LIMIT 100", &[ &location, &serde_json::to_value(possession_type)?, &serde_json::to_value(exclude_codes)?, ], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn find_item_by_location_dynroom_code<'a>( self: &'a Self, location: &'a str, dynroom_code: &'a str, ) -> DResult> { match self .pg_trans()? .query_opt( "SELECT details FROM items WHERE details->>'location' = $1 \ AND details->'special_data'->'DynroomData'->>'dynroom_code' = $2 LIMIT 1", &[&location, &dynroom_code], ) .await? { None => Ok(None), Some(v) => Ok(Some(serde_json::from_value(v.get("details"))?)), } } pub async fn save_item_model(self: &Self, details: &Item) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO items (details) VALUES ($1) \ ON CONFLICT ((details->>'item_type'), \ (details->>'item_code')) DO UPDATE SET \ details = EXCLUDED.details", &[&serde_json::to_value(details)?], ) .await?; Ok(()) } pub async fn delete_item<'a>( self: &'a Self, item_type: &'a str, item_code: &'a str, ) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM items WHERE \ details->>'item_type' = $1 AND \ details->>'item_code' = $2", &[&item_type, &item_code], ) .await?; Ok(()) } pub async fn find_session_for_player<'a>( self: &'a Self, item_code: &'a str, ) -> DResult> { Ok(self .pg_trans()? .query_opt( "SELECT u.current_listener, u.current_session, s.details \ FROM users u JOIN sessions s ON s.session = u.current_session \ WHERE u.username=$1", &[&item_code], ) .await? .and_then(|r| { match ( r.get("current_listener"), r.get("current_session"), r.get("details"), ) { (Some(listener), Some(session), details) => Some(( ListenerSession { listener, session }, serde_json::from_value(details).ok()?, )), _ => None, } })) } pub async fn resolve_items_by_display_name_for_player<'l>( self: &'l Self, search: &'l ItemSearchParams<'l>, ) -> DResult>>> { let mut ctes: Vec = Vec::new(); let mut include_tables: Vec<&'static str> = Vec::new(); let player_loc = &search.from_item.location; let player_desig = format!( "{}/{}", search.from_item.item_type, search.from_item.item_code ); let (offset, mut query) = parse_offset(search.query); let mut param_no: usize = 5; let mut extra_order: String = String::new(); let mut extra_where: String = String::new(); let mut dead_only = false; if query.starts_with("dead") { query = query[4..].trim(); dead_only = true; } else if query.starts_with("corpse of") { query = query[9..].trim(); dead_only = true; } else if query.starts_with("corpse") { query = query[6..].trim(); dead_only = true; } if query.ends_with("ies") { query = &query[0..(query.len() - 3)]; } else if query.ends_with("es") { query = &query[0..(query.len() - 2)]; } else if query.ends_with("s") { query = &query[0..(query.len() - 1)]; } if dead_only { extra_where.push_str(" AND COALESCE(details->>'death_data' IS NOT NULL, false) = true"); } else if search.dead_first { extra_order.push_str(" COALESCE(details->>'death_data' IS NOT NULL, false) DESC,"); } else { extra_order.push_str(" COALESCE(details->>'death_data' IS NOT NULL, false) ASC,"); } let query_wildcard = query .replace("\\", "\\\\") .replace("_", "\\_") .replace("%", "") .to_lowercase() + "%"; let offset_sql = offset .map(|x| (if x >= 1 { x - 1 } else { x }) as i64) .unwrap_or(0); let query_len = query.len() as i32; let limit = search.limit as i64; let mut params: Vec<&(dyn ToSql + Sync)> = vec![&query_wildcard, &offset_sql, &query_len, &limit]; match search.item_type_only { None => {} Some(ref item_type) => { extra_where.push_str(&format!(" AND details->>'item_type' = ${}", param_no)); param_no += 1; params.push(item_type); } } let item_action_type_value: Option = match search.item_action_type_only { None => None, Some(v) => Some(serde_json::to_value(v)?), }; match item_action_type_value { None => {} Some(ref item_action_type) => { extra_where.push_str(&format!( " AND (details->'action_type')::TEXT = ${}::JSONB::TEXT", param_no )); param_no += 1; params.push(item_action_type); } } let flagged_only_value: Option = match search.flagged_only.as_ref() { None => None, Some(v) => Some(serde_json::to_value(v)?), }; if let Some(flag) = flagged_only_value.as_ref() { extra_where.push_str(&format!(" AND details->'flags' @> (${}::JSONB)", param_no)); param_no += 1; params.push(flag); } if search.include_contents { ctes.push(format!("contents AS (\ SELECT details, details->'aliases' AS aliases FROM items WHERE details->>'location' = ${}\ )", param_no)); param_no += 1; params.push(&player_desig); include_tables.push("SELECT details, aliases FROM contents"); } if search.include_loc_contents { ctes.push(format!("loc_contents AS (\ SELECT details, details->'aliases' AS aliases FROM items WHERE details->>'location' = ${}\ )", param_no)); #[allow(dropping_copy_types)] drop(param_no); // or increment if this is a problem. params.push(&player_loc); include_tables.push("SELECT details, aliases FROM loc_contents"); } if search.include_active_players { ctes.push( "active_players AS (\ SELECT i.details, ('[]'::JSONB) AS aliases FROM items i \ JOIN users u ON u.username = i.details->>'item_code' \ WHERE i.details->>'item_type' = 'player' \ AND u.current_session IS NOT NULL \ )" .to_owned(), ); include_tables.push("SELECT details, aliases FROM active_players"); } if search.include_all_players { ctes.push("all_players AS (\ SELECT details, ('[]'::JSONB) AS aliases FROM items WHERE details->>'item_type' = 'player' )".to_owned()); include_tables.push("SELECT details, aliases FROM all_players"); } ctes.push(format!( "relevant_items AS ({})", include_tables.join(" UNION ") )); let cte_str: String = ctes.join(", "); Ok(Arc::new( self.pg_trans()? .query( &format!( "WITH {} SELECT details FROM relevant_items \ WHERE \ ((lower(details->>'display') LIKE $1) \ OR (lower(details ->>'display_less_explicit') LIKE $1) \ OR EXISTS (SELECT 1 FROM jsonb_array_elements(aliases) AS al(alias) WHERE \ LOWER(alias#>>'{{}}') LIKE $1)) {} \ ORDER BY {} ABS(length(details->>'display')-$3) ASC \ LIMIT $4 OFFSET $2", &cte_str, &extra_where, &extra_order ), ¶ms, ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect(), )) } pub async fn get_next_scheduled_task(&self) -> DResult> { match self .pg_trans()? .query_opt( "SELECT details FROM tasks WHERE \ CAST(details->>'next_scheduled' AS TIMESTAMPTZ) <= now() \ ORDER BY details->>'next_scheduled' ASC LIMIT 1", &[], ) .await? { None => Ok(None), Some(row) => Ok(serde_json::from_value(row.get("details"))?), } } pub async fn delete_task<'a>(&'a self, task_type: &'a str, task_code: &'a str) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM tasks WHERE details->>'task_type' = $1 AND \ details->>'task_code' = $2", &[&task_type, &task_code], ) .await?; Ok(()) } pub async fn upsert_task<'a>(&'a self, task: &'a Task) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO tasks (details) \ VALUES ($1) \ ON CONFLICT ((details->>'task_code'), (details->>'task_type')) \ DO UPDATE SET details = $1", &[&serde_json::to_value(task)?], ) .await?; Ok(()) } pub async fn update_task<'a>( &'a self, task_type: &'a str, task_code: &'a str, task: &'a TaskParse, ) -> DResult<()> { self.pg_trans()? .execute( "UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \ details->>'task_code' = $2", &[&task_type, &task_code, &serde_json::to_value(task)?], ) .await?; Ok(()) } pub async fn check_task_by_type_code<'a>( &'a self, task_type: &'a str, task_code: &'a str, ) -> DResult { let n: i64 = self .pg_trans()? .query_one( "SELECT COUNT(*) FROM tasks WHERE \ details->>'task_type' = $1 AND \ details->>'task_code' = $2", &[&task_type, &task_code], ) .await? .get(0); Ok(n > 0) } pub async fn alloc_item_code(&self) -> DResult { Ok(self .pg_trans()? .query_one("SELECT NEXTVAL('item_seq')", &[]) .await? .get(0)) } pub async fn get_online_info(&self) -> DResult> { Ok(self .pg_trans()? .query( "WITH show_corps AS (\ SELECT DISTINCT ON (member_username) \ m.member_username AS username, \ c.details ->> 'name' AS corpname \ FROM corps c JOIN corp_membership m ON m.corp_id = c.corp_id \ ORDER BY m.member_username, (m.details->>'priority')::int DESC NULLS LAST, \ (m.details->>'joined_at') :: TIMESTAMPTZ ASC \ )\ SELECT jsonb_build_object(\ 'username', u.details->>'username',\ 'corp', c.corpname, \ 'time', s.details->>'last_active'\ ) FROM sessions s \ JOIN users u ON u.current_session = s.session \ LEFT JOIN show_corps c ON c.username = LOWER(u.username) \ ORDER BY s.details->>'last_active' DESC", &[], ) .await? .into_iter() .filter_map(|row| serde_json::from_value(row.get(0)).ok()) .collect()) } pub async fn get_location_stats(&self, location: &str) -> DResult { Ok(serde_json::from_value(self.pg_trans()?.query_one( "SELECT JSON_BUILD_OBJECT('total_count', COUNT(*), 'total_weight', COALESCE(SUM(CAST(details->>'weight' AS NUMERIC)), 0)) \ FROM items WHERE details->>'location' = $1", &[&location] ).await?.get(0))?) } pub async fn set_exclusive_action_type_to( &self, item: &Item, new_action_type: &LocationActionType, other_item_action_type: &LocationActionType, ) -> DResult<()> { self.pg_trans()? .execute( "UPDATE items SET details=\ JSONB_SET(details, '{action_type}', $1) \ WHERE details->>'location' = $2 AND \ (details->'action_type')::TEXT = $3::JSONB::TEXT", &[ &serde_json::to_value(other_item_action_type)?, &item.location, &serde_json::to_value(new_action_type)?, ], ) .await?; self.pg_trans()? .execute( "UPDATE items SET details=\ JSONB_SET(details, '{action_type}', $1) \ WHERE details->>'item_type' = $2 AND \ details->>'item_code' = $3", &[ &serde_json::to_value(new_action_type)?, &item.item_type, &item.item_code, ], ) .await?; Ok(()) } pub async fn find_by_action_and_location( &self, location: &str, action_type: &LocationActionType, ) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE \ details->>'location' = $1 AND \ ((details->'action_type')::TEXT = $2::JSONB::TEXT) LIMIT 100", &[&location, &serde_json::to_value(action_type)?], ) .await? .into_iter() .filter_map( |row| match serde_json::from_value::(row.get("details")) { Err(_) => None, Ok(item) => Some(Arc::new(item)), }, ) .collect()) } pub async fn list_consents( &self, consenting: &str, ) -> DResult> { Ok(self .pg_trans()? .query( "SELECT consented_user, consent_type, details \ FROM user_consent \ WHERE consenting_user = $1", &[&consenting.to_lowercase()], ) .await? .into_iter() .filter_map(|row| match serde_json::from_value(row.get(2)) { Err(_) => None, Ok(consent) => ConsentType::from_str(row.get(1)) .map(|consent_type| (row.get(0), consent_type, consent)), }) .collect()) } pub async fn find_user_consent_by_parties_type( &self, consenting: &str, consented: &str, consent_type: &ConsentType, ) -> DResult> { match self .pg_trans()? .query_opt( "SELECT details FROM user_consent WHERE consenting_user = $1 AND \ consented_user = $2 AND consent_type = $3", &[&consenting, &consented, &ConsentType::to_str(consent_type)], ) .await? { None => Ok(None), Some(row) => Ok(Some(serde_json::from_value(row.get(0))?)), } } pub async fn find_corp_consent_by_parties_type( &self, consenting: &CorpId, consented: &CorpId, consent_type: &ConsentType, ) -> DResult> { match self .pg_trans()? .query_opt( "SELECT details FROM corp_consent WHERE consenting_corp = $1 AND \ consented_corp = $2 AND consent_type = $3", &[ &consenting.0, &consented.0, &ConsentType::to_str(consent_type), ], ) .await? { None => Ok(None), Some(row) => Ok(Some(serde_json::from_value(row.get(0))?)), } } pub async fn find_corp_consent_by_user_parties_type( &self, usr_consenting: &str, usr_consented: &str, consent_type: &ConsentType, ) -> DResult> { match self .pg_trans()? .query_opt( "SELECT cc.details FROM corp_consent cc \ JOIN corp_membership cm_consenting ON cc.consenting_corp = cm_consenting.corp_id \ JOIN corp_membership cm_consented ON cc.consented_corp = cm_consented.corp_id \ WHERE cm_consenting.member_username = $1 AND \ cm_consented.member_username = $2 AND cc.consent_type = $3", &[ &usr_consenting, &usr_consented, &ConsentType::to_str(consent_type), ], ) .await? { None => Ok(None), Some(row) => Ok(Some(serde_json::from_value(row.get(0))?)), } } pub async fn revoke_until_death_consent(&self, party: &str) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM user_consent WHERE (consenting_user = $1 OR \ (consented_user = $1 AND consent_type = 'fight')) AND \ details->>'until_death'='true'", &[&party], ) .await?; self.pg_trans()? .execute( "DELETE FROM corp_consent cc USING corp_membership cm WHERE (cc.consenting_corp = cm.corp_id OR cc.consented_corp = cm.corp_id) AND \ cm.member_username = $1 AND \ cm.details->>'joined_at' IS NOT NULL AND \ cc.consent_type = 'fight' AND \ cc.details->>'until_death'='true'", &[&party], ) .await?; Ok(()) } pub async fn delete_expired_user_consent(&self) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM user_consent WHERE details->>'expires' < $1", &[&Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)], ) .await?; Ok(()) } pub async fn delete_expired_corp_consent(&self) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM corp_consent WHERE details->>'expires' < $1", &[&Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)], ) .await?; Ok(()) } pub async fn delete_user_consent( &self, consenting: &str, consented: &str, consent_type: &ConsentType, ) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM user_consent WHERE consenting_user = $1 AND \ consented_user = $2 AND consent_type = $3", &[&consenting, &consented, &ConsentType::to_str(consent_type)], ) .await?; Ok(()) } pub async fn delete_corp_consent( &self, consenting: &CorpId, consented: &CorpId, consent_type: &ConsentType, ) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM corp_consent WHERE consenting_corp = $1 AND \ consented_corp = $2 AND consent_type = $3", &[ &consenting.0, &consented.0, &ConsentType::to_str(consent_type), ], ) .await?; Ok(()) } pub async fn upsert_user_consent( &self, consenting: &str, consented: &str, consent_type: &ConsentType, details: &Consent, ) -> DResult<()> { self.pg_trans()? .execute("INSERT INTO user_consent (consenting_user, consented_user, consent_type, details) VALUES ($1, $2, $3, $4) \ ON CONFLICT (consenting_user, consented_user, consent_type) DO UPDATE SET \ details = EXCLUDED.details", &[&consenting, &consented, &ConsentType::to_str(consent_type), &serde_json::to_value(details)?]).await?; Ok(()) } pub async fn upsert_corp_consent( &self, consenting: &CorpId, consented: &CorpId, consent_type: &ConsentType, details: &Consent, ) -> DResult<()> { self.pg_trans()? .execute("INSERT INTO corp_consent (consenting_corp, consented_corp, consent_type, details) VALUES ($1, $2, $3, $4) \ ON CONFLICT (consenting_corp, consented_corp, consent_type) DO UPDATE SET \ details = EXCLUDED.details", &[&consenting.0, &consented.0, &ConsentType::to_str(consent_type), &serde_json::to_value(details)?]).await?; Ok(()) } pub async fn find_corp_by_name(&self, name: &str) -> DResult> { Ok( match self .pg_trans()? .query_opt( "SELECT corp_id, details FROM corps WHERE LOWER(details->>'name') = $1", &[&name.to_lowercase()], ) .await? { None => None, Some(row) => Some(( CorpId(row.get("corp_id")), serde_json::from_value(row.get("details"))?, )), }, ) } pub async fn match_user_corp_by_name( &self, corpname: &str, username: &str, ) -> DResult> { Ok( match self .pg_trans()? .query_opt( "SELECT c.corp_id, c.details AS cdetails, m.details AS mdetails FROM corps c \ JOIN corp_membership m ON m.corp_id = c.corp_id \ WHERE LOWER(c.details->>'name') LIKE $1 AND \ m.member_username = $2 \ ORDER BY \ ABS(length(c.details->>'name')-length($1)) DESC \ LIMIT 1", &[ &(corpname .replace("\\", "\\\\") .replace("_", "\\_") .replace("%", "") .to_lowercase() + "%"), &(username.to_lowercase()), ], ) .await? { None => None, Some(row) => Some(( CorpId(row.get("corp_id")), serde_json::from_value(row.get("cdetails"))?, serde_json::from_value(row.get("mdetails"))?, )), }, ) } pub async fn create_corp(&self, details: &Corp) -> DResult { let id = self .pg_trans()? .query_one( "INSERT INTO corps (details) VALUES ($1) RETURNING corp_id", &[&serde_json::to_value(details)?], ) .await? .get("corp_id"); Ok(CorpId(id)) } pub async fn update_corp_details(&self, corp: &CorpId, details: &Corp) -> DResult<()> { self.pg_trans()? .execute( "UPDATE corps SET details=$1 WHERE corp_id = $2", &[&serde_json::to_value(details)?, &corp.0], ) .await?; Ok(()) } pub async fn expire_old_invites(&self) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM corp_membership WHERE details->>'invited_at' <= $1", &[&(Utc::now() - chrono::Duration::hours(4)) .to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)], ) .await?; Ok(()) } pub async fn upsert_corp_membership( &self, corp: &CorpId, username: &str, details: &CorpMembership, ) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO corp_membership (corp_id, member_username, details) \ VALUES ($1, $2, $3) \ ON CONFLICT (corp_id, member_username) DO UPDATE SET \ details = excluded.details", &[ &corp.0, &username.to_lowercase(), &serde_json::to_value(details)?, ], ) .await?; Ok(()) } pub async fn get_corp_memberships_for_user( &self, username: &str, ) -> DResult> { Ok(self .pg_trans()? .query( "SELECT m.corp_id, c.details->>'name', m.details FROM corp_membership m \ JOIN corps c ON c.corp_id = m.corp_id WHERE m.member_username = $1 \ AND m.details->>'joined_at' IS NOT NULL \ ORDER BY (m.details->>'priority')::int DESC NULLS LAST, \ (m.details->>'joined_at') :: TIMESTAMPTZ ASC", &[&username.to_lowercase()], ) .await? .into_iter() .filter_map(|row| match serde_json::from_value(row.get(2)) { Err(_) => None, Ok(j) => Some((CorpId(row.get(0)), row.get(1), j)), }) .collect()) } pub async fn get_default_corp_for_user( &self, username: &str, ) -> DResult> { Ok( match self .pg_trans()? .query_opt( "SELECT m.corp_id, c.details FROM corp_membership m \ JOIN corps c ON c.corp_id = m.corp_id WHERE m.member_username = $1 \ AND m.details->>'joined_at' IS NOT NULL \ ORDER BY (m.details->>'priority')::int ASC NULLS LAST, \ (m.details->>'joined_at') :: TIMESTAMPTZ ASC LIMIT 1", &[&username.to_lowercase()], ) .await? { None => None, Some(row) => match serde_json::from_value(row.get(1)) { Err(_) => None, Ok(j) => Some((CorpId(row.get(0)), j)), }, }, ) } pub async fn broadcast_to_corp<'a>( self: &'a Self, corp_id: &'a CorpId, comm_type: &'a CorpCommType, except_user: Option<&'a str>, message: &'a str, ) -> DResult<()> { let comm_type_s = serde_json::to_value(comm_type)? .as_str() .ok_or("comm type doesn't serialise to JSON string")? .to_owned(); let mut params: Vec<&(dyn ToSql + Sync)> = vec![&message, &corp_id.0, &comm_type_s]; let mut query = "INSERT INTO sendqueue (session, listener, message) \ SELECT s.session, s.listener, $1 FROM \ sessions s \ JOIN users u ON u.current_session = s.session \ JOIN corp_membership m ON m.member_username = u.username \ WHERE m.corp_id = $2 AND \ (m.details->'comms_on') ? $3 AND \ m.details->>'joined_at' IS NOT NULL" .to_owned(); match except_user.as_ref() { None => {} Some(u) => { query.push_str(" AND u.username <> $4"); params.push(u); } } self.pg_trans()?.execute(&query, ¶ms).await?; Ok(()) } pub async fn list_corp_members<'a>( self: &'a Self, corp_id: &'a CorpId, ) -> DResult> { Ok(self .pg_trans()? .query( "SELECT member_username, details \ FROM corp_membership WHERE \ corp_id = $1", &[&corp_id.0], ) .await? .iter() .filter_map(|i| { Some(( i.get("member_username"), serde_json::from_value(i.get("details")).ok()?, )) }) .collect()) } pub async fn delete_corp<'a>(self: &'a Self, corp_id: &'a CorpId) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM corps WHERE \ corp_id = $1", &[&corp_id.0], ) .await?; Ok(()) } pub async fn delete_corp_membership<'a>( self: &'a Self, corp_id: &'a CorpId, username: &'a str, ) -> DResult<()> { self.pg_trans()? .execute( "DELETE FROM corp_membership WHERE \ corp_id = $1 AND member_username = $2", &[&corp_id.0, &username.to_lowercase()], ) .await?; Ok(()) } pub async fn count_matching_possessions<'a>( self: &'a Self, location: &str, allowed_types: &'a [PossessionType], ) -> DResult { Ok(self.pg_trans()? .query_one( "SELECT COUNT(*) FROM items WHERE details->>'location' = $1 AND $2 @> (details->'possession_type')", &[&location, &serde_json::to_value(allowed_types)?] ).await?.get(0)) } pub async fn count_followers_by_leader<'a>(self: &'a Self, leader: &str) -> DResult { Ok(self .pg_trans()? .query_one( "SELECT COUNT(*) FROM items WHERE details->'following'->>'follow_whom' = $1", &[&leader], ) .await? .get(0)) } pub async fn find_followers_by_leader<'a>( self: &'a Self, leader: &str, ) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE details->'following'->>'follow_whom' = $1", &[&leader], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn count_staff_by_hirer<'a>(self: &'a Self, hirer: &str) -> DResult { Ok(self .pg_trans()? .query_one( "SELECT COUNT(*) FROM items WHERE details->'special_data'->'HireData'->>'hired_by' = $1", &[&hirer], ) .await? .get(0)) } pub async fn find_staff_by_hirer<'a>(self: &'a Self, hirer: &str) -> DResult>> { Ok(self .pg_trans()? .query( "SELECT details FROM items WHERE details->'special_data'->'HireData'->>'hired_by' = $1", &[&hirer], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn sendqueue_to_abusereport( &self, uuid: &Uuid, username: &str, session: &ListenerSession, ) -> DResult<()> { self.pg_trans()? .execute( "INSERT INTO abuselog (id, triggered_by, logdata, expires) \ VALUES ($1, $2, (\ SELECT json_build_object(\ 'sendqueue', json_build_array(array_agg(json_build_object(\ 'message', message, 'sent_at', sent_at)))) FROM sendqueue WHERE \ listener = $3 AND session = $4), \ NOW() + INTERVAL '60 days')", &[&uuid, &username, &session.listener, &session.session], ) .await?; Ok(()) } pub async fn clean_and_count_abusereports(&self, username: &str) -> DResult { let trans = self.pg_trans()?; trans .execute("DELETE FROM abuselog WHERE expires < NOW()", &[]) .await?; Ok(trans .query_one( "SELECT COUNT(*) AS n FROM abuselog \ WHERE triggered_by = $1", &[&username], ) .await? .get("n")) } pub async fn stop_urges_for_sessionless(&self) -> DResult<()> { self.pg_trans()? .execute( "UPDATE items SET details = JSONB_SET(details, '{flags}', \ (SELECT COALESCE(jsonb_agg(elem), '[]') FROM jsonb_array_elements(details->'flags') elem \ WHERE elem <> '\"HasUrges\"')) \ WHERE details->'flags' @> '\"HasUrges\"' AND \ details->>'item_type' = 'player' AND NOT EXISTS \ (SELECT 1 FROM users WHERE current_session IS NOT NULL AND \ username = items.details->>'item_code')", &[], ) .await?; Ok(()) } // name is static since it should not be user generated (not escaped). pub async fn apply_urge_tick(&self, name: &'static str) -> DResult<()> { self.pg_trans()? .execute( &format!( "UPDATE items SET details = \ JSONB_SET(\ JSONB_SET(details, '{{urges, {}, last_value}}', details->'urges'->'{}'->'value'), \ '{{urges, {}, value}}', \ TO_JSONB(GREATEST(0, LEAST(10000, (details->'urges'->'{}'->'value')::NUMERIC + (details->'urges'->'{}'->'growth')::NUMERIC))) \ ) \ WHERE details->'flags' @> '\"HasUrges\"'", name, name, name, name, name ), &[] ) .await?; Ok(()) } pub async fn get_urges_crossed_milestones( &self, name: &'static str, ) -> DResult>> { Ok(self .pg_trans()? .query( &format!( "WITH details_urg AS (\ SELECT details, (details->'urges'->'{}'->'last_value')::NUMERIC AS last,\ (details->'urges'->'{}'->'value')::NUMERIC AS curr \ FROM items WHERE details->'flags' @> '\"HasUrges\"' \ ) \ SELECT details FROM details_urg WHERE (\ (last < 2500 AND curr >= 2500) OR \ (last >= 2500 AND curr < 2500) OR \ (last < 5000 AND curr >= 5000) OR \ (last >= 5000 AND curr < 5000) OR \ (last < 7500 AND curr >= 7500) OR \ (last >= 7500 AND curr < 7500) OR \ (last = 10000 AND curr <> 10000) OR \ (last <> 10000 AND curr = 10000))", name, name ), &[], ) .await? .into_iter() .filter_map(|i| serde_json::from_value(i.get("details")).ok()) .map(Arc::new) .collect()) } pub async fn commit(mut self: Self) -> DResult<()> { let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None)); if let Some(trans) = trans_opt { trans.commit().await?; } Ok(()) } pub fn pg_trans<'a>(self: &'a Self) -> DResult<&'a Transaction<'a>> { self.borrow_trans() .as_ref() .ok_or("Transaction already closed".into()) } } // This seems hard to test since most of DbError is private. #[cfg(not(test))] pub fn is_concurrency_error(e: &(dyn Error + 'static)) -> bool { match e.source() { None => {} Some(e_src) => { if is_concurrency_error(e_src) { return true; } } } match e.downcast_ref::() { None => false, Some(dbe) => dbe.code() == &SqlState::T_R_SERIALIZATION_FAILURE, } }