blastmud/blastmud_game/src/db.rs

692 lines
29 KiB
Rust
Raw Normal View History

2022-12-23 23:31:49 +11:00
use tokio_postgres::{config::Config as PgConfig, row::Row};
2022-12-25 00:25:52 +11:00
use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, Transaction,
RecyclingMethod};
use std::error::Error;
use std::str::FromStr;
2022-12-25 00:25:52 +11:00
use ouroboros::self_referencing;
use uuid::Uuid;
use tokio_postgres::NoTls;
2022-12-23 23:31:49 +11:00
use crate::message_handler::ListenerSession;
use crate::DResult;
use crate::message_handler::user_commands::parsing::parse_offset;
2023-01-02 13:25:05 +11:00
use crate::models::{
session::Session,
user::User,
2023-02-19 01:18:08 +11:00
item::{
Item,
LocationActionType,
},
2023-01-02 13:25:05 +11:00
task::{Task, TaskParse}
};
2022-12-27 16:08:27 +11:00
use tokio_postgres::types::ToSql;
use std::collections::BTreeSet;
use std::sync::Arc;
2023-01-28 23:00:53 +11:00
use serde::{Serialize, Deserialize};
use serde_json::{self, Value};
2022-12-25 00:25:52 +11:00
use futures::FutureExt;
2023-01-28 23:00:53 +11:00
use chrono::{DateTime, Utc};
#[cfg(test)]
use mockall::automock;
#[derive(Clone, Debug)]
pub struct DBPool {
pool: Pool
}
2022-12-25 00:25:52 +11:00
#[self_referencing]
pub struct DBTrans {
conn: Object,
#[borrows(mut conn)]
#[covariant]
pub trans: Option<Transaction<'this>>
}
2022-12-23 23:31:49 +11:00
#[derive(Clone, Debug)]
pub struct SendqueueItem {
pub item: i64,
pub session: ListenerSession,
pub message: Option<String>
2022-12-23 23:31:49 +11:00
}
impl From<Row> for SendqueueItem {
fn from(row: Row) -> Self {
SendqueueItem {
item: row.get("item"),
session: ListenerSession {
session: row.get("session"),
listener: row.get("listener")
},
message: row.get("message")
}
}
}
2023-01-28 23:00:53 +11:00
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OnlineInfo {
pub username: String,
pub time: Option<DateTime<Utc>>
}
2023-02-03 23:26:24 +11:00
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocationStats {
pub total_count: u64,
pub total_weight: u64,
}
#[cfg_attr(test, allow(dead_code))]
impl DBPool {
2022-12-24 21:16:23 +11:00
pub async fn record_listener_ping(self: &DBPool, listener: Uuid) -> DResult<()> {
self.get_conn().await?.execute(
"INSERT INTO listeners (listener, last_seen) \
VALUES ($1, NOW()) \
ON CONFLICT (listener) \
DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener]).await?;
Ok(())
}
2022-12-24 21:16:23 +11:00
pub async fn get_dead_listeners(self: &Self) -> DResult<Vec<Uuid>> {
Ok(self.get_conn().await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect())
}
2022-12-24 21:16:23 +11:00
pub async fn cleanup_listener(self: &Self, listener: Uuid) -> DResult<()> {
let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \
current_listener = NULL WHERE current_listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sendqueue WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sessions WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM listeners WHERE listener = $1",
&[&listener]).await?;
tx.commit().await?;
Ok(())
}
2022-12-24 21:16:23 +11:00
pub async fn start_session(self: &Self, session: &ListenerSession, details: &Session) -> DResult<()> {
self.get_conn().await?.execute(
2022-12-24 13:43:28 +11:00
"INSERT INTO sessions (session, listener, details) \
2022-12-24 21:16:23 +11:00
VALUES ($1, $2, $3) ON CONFLICT (session) DO NOTHING",
&[&session.session, &session.listener, &serde_json::to_value(details)?]
).await?;
Ok(())
}
2022-12-23 23:31:49 +11:00
2022-12-24 21:16:23 +11:00
pub async fn end_session(self: &Self, session: ListenerSession) -> DResult<()> {
2022-12-23 23:31:49 +11:00
let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \
current_listener = NULL WHERE current_session = $1",
&[&session.session]).await?;
tx.execute("DELETE FROM sendqueue WHERE session = $1",
&[&session.session]).await?;
tx.execute("DELETE FROM sessions WHERE session = $1",
&[&session.session]).await?;
tx.commit().await?;
Ok(())
}
2022-12-25 00:25:52 +11:00
pub async fn start_transaction(self: &Self) -> DResult<DBTrans> {
let conn = self.get_conn().await?;
Ok(DBTransAsyncSendTryBuilder {
conn,
trans_builder: |conn| Box::pin(conn.transaction().map(|r| r.map(Some)))
}.try_build().await?)
}
2022-12-24 21:16:23 +11:00
pub async fn queue_for_session(self: &Self,
2022-12-23 23:31:49 +11:00
session: &ListenerSession,
message: Option<&str>) -> DResult<()> {
2022-12-23 23:31:49 +11:00
let conn = self.get_conn().await?;
conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?;
Ok(())
}
2022-12-24 21:16:23 +11:00
pub async fn get_from_sendqueue(self: &Self) -> DResult<Vec<SendqueueItem>> {
2022-12-23 23:31:49 +11:00
let conn = self.get_conn().await?;
Ok(conn.query("SELECT item, session, listener, message FROM sendqueue ORDER BY item ASC LIMIT 10",
&[])
.await?.into_iter().map(SendqueueItem::from).collect())
}
2022-12-24 21:16:23 +11:00
pub async fn delete_from_sendqueue(self: &DBPool, item: &SendqueueItem) -> DResult<()> {
2022-12-23 23:31:49 +11:00
let conn = self.get_conn().await?;
conn.execute("DELETE FROM sendqueue WHERE item=$1", &[&item.item]).await?;
Ok(())
}
pub async fn find_static_item_types(self: &Self) -> DResult<Box<BTreeSet<String>>> {
Ok(Box::new(
self
.get_conn().await?
.query("SELECT DISTINCT details->>'item_type' AS item_type \
FROM items WHERE details->>'is_static' = 'true'", &[]).await?
.iter()
.map(|r| r.get("item_type"))
.collect()))
}
pub async fn find_static_task_types(self: &Self) -> DResult<Box<BTreeSet<String>>> {
Ok(Box::new(
self
.get_conn().await?
.query("SELECT DISTINCT details->>'task_type' AS task_type \
FROM tasks WHERE details->>'is_static' = 'true'", &[]).await?
.iter()
.map(|r| r.get("task_type"))
.collect()))
}
pub async fn delete_static_items_by_type(self: &Self, item_type: &str) -> DResult<()> {
self.get_conn().await?.query(
"DELETE FROM items WHERE details->>'is_static' = 'true' AND details->>'item_type' = $1",
&[&item_type]).await?;
Ok(())
}
pub async fn delete_static_tasks_by_type(self: &Self, task_type: &str) -> DResult<()> {
self.get_conn().await?.query(
"DELETE FROM tasks WHERE details->>'is_static' = 'true' AND details->>'task_type' = $1",
&[&task_type]).await?;
Ok(())
}
2023-01-28 23:00:53 +11:00
pub async fn bump_session_time(&self, session: &ListenerSession) -> DResult<()> {
self.get_conn().await?.query(
"UPDATE sessions SET details=JSONB_SET(details, '{last_active}', to_json(NOW())::jsonb) \
WHERE session = $1", &[&session.session]
).await?;
Ok(())
}
2022-12-24 21:16:23 +11:00
pub async fn get_conn(self: &DBPool) ->
DResult<Object> {
let conn = self.pool.get().await?;
conn.execute("SET synchronous_commit=off", &[]).await?;
Ok(conn)
}
2022-12-23 23:31:49 +11:00
pub fn start(connstr: &str) -> DResult<DBPool> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast
};
let mgr = Manager::from_config(
PgConfig::from_str(connstr)
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?,
NoTls, mgr_config
);
Pool::builder(mgr).max_size(4).build()
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
2022-12-23 23:31:49 +11:00
.map(|pool| Self { pool })
}
}
2022-12-25 00:25:52 +11:00
#[derive(Clone, Debug)]
pub struct ItemSearchParams<'l> {
pub from_item: &'l Item,
pub query: &'l str,
pub include_contents: bool,
pub include_loc_contents: bool,
pub include_active_players: bool,
pub include_all_players: bool
}
impl ItemSearchParams<'_> {
pub fn base<'l>(from_item: &'l Item, query: &'l str) -> ItemSearchParams<'l> {
ItemSearchParams {
from_item, query,
include_contents: false,
include_loc_contents: false,
include_active_players: false,
include_all_players: false
}
}
}
#[cfg_attr(test, automock)]
#[cfg_attr(test, allow(dead_code))]
2022-12-25 00:25:52 +11:00
impl DBTrans {
pub async fn queue_for_session<'a>(self: &'a Self,
session: &'a ListenerSession,
message: Option<&'a str>) -> DResult<()> {
2022-12-25 00:25:52 +11:00
self.pg_trans()?
.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?;
Ok(())
}
pub async fn get_session_user_model<'a>(self: &'a Self, session: &'a ListenerSession) -> DResult<Option<(Session, Option<User>)>> {
match self.pg_trans()?
2022-12-26 01:30:59 +11:00
.query_opt("SELECT s.details AS sess_details, \
u.details AS user_details FROM sessions s \
LEFT JOIN users u ON u.current_session = s.session \
WHERE s.session = $1", &[&session.session])
.await? {
None => Ok(None),
Some(row) =>
2022-12-26 01:30:59 +11:00
Ok(Some(
(serde_json::from_value(
row.get("sess_details"))?,
match row.get::<&str, Option<serde_json::Value>>("user_details") {
None => None,
Some(v) => serde_json::from_value(v)?
})
))
}
}
2022-12-26 01:30:59 +11:00
pub async fn save_session_model<'a>(self: &'a Self, session: &'a ListenerSession, details: &Session)
2022-12-26 01:30:59 +11:00
-> DResult<()> {
self.pg_trans()?
.execute("UPDATE sessions SET details = $1 WHERE session = $2",
&[&serde_json::to_value(details)?, &session.session]).await?;
Ok(())
}
pub async fn find_by_username<'a>(self: &'a Self, username: &'a str) -> DResult<Option<User>> {
if let Some(details_json) = self.pg_trans()?
.query_opt("SELECT details FROM users WHERE username=$1",
&[&username.to_lowercase()]).await? {
return Ok(Some(serde_json::from_value(details_json.get("details"))?))
}
Ok(None)
}
pub async fn create_item<'a>(self: &'a Self, item: &'a Item) -> DResult<i64> {
Ok(self.pg_trans()?.query_one("INSERT INTO items (details) VALUES ($1) RETURNING item_id",
&[&serde_json::to_value(item)?]).await?
.get("item_id"))
}
pub async fn limited_update_static_item<'a>(self: &'a Self, item: &'a Item) -> DResult<()> {
let value = serde_json::to_value(item)?;
let obj_map = value.as_object()
.expect("Static item to be object in JSON");
let mut params: Vec<&(dyn ToSql + Sync)> = vec!(&item.item_type, &item.item_code);
let mut det_ex: String = "details".to_owned();
let mut var_id = 3;
// Only copy more permanent fields, others are supposed to change over time and shouldn't
// be reset on restart.
for to_copy in ["display", "display_less_explicit", "details", "details_less_explicit",
2023-01-15 17:30:23 +11:00
"total_xp", "total_stats", "total_skills", "pronouns", "flags",
2023-01-22 22:43:44 +11:00
"sex", "is_challenge_attack_only", "aliases", "species"] {
det_ex = format!("jsonb_set({}, '{{{}}}', ${})", det_ex, to_copy, var_id);
params.push(obj_map.get(to_copy).unwrap_or(&Value::Null));
var_id += 1;
}
self.pg_trans()?.execute(
&("UPDATE items SET details = ".to_owned() + &det_ex +
" WHERE details->>'item_type' = $1 AND details->>'item_code' = $2"),
&params).await?;
Ok(())
}
pub async fn limited_update_static_task<'a>(self: &'a Self, task: &'a Task) -> DResult<()> {
let value = serde_json::to_value(task)?;
let obj_map = value.as_object()
.expect("Static task to be object in JSON");
let task_name: &(dyn ToSql + Sync) = &task.details.name();
let mut params: Vec<&(dyn ToSql + Sync)> = vec!(task_name, &task.meta.task_code);
let mut det_ex: String = "details".to_owned();
let mut var_id = 3;
// Only copy more permanent fields, others are supposed to change over time and shouldn't
// be reset on restart. We do reset failure count since the problem may be fixed.
for to_copy in ["recurrence", "consecutive_failure_count", "task_details"] {
det_ex = format!("jsonb_set({}, '{{{}}}', ${})", det_ex, to_copy, var_id);
params.push(obj_map.get(to_copy).unwrap_or(&Value::Null));
var_id += 1;
}
self.pg_trans()?.execute(
&("UPDATE tasks SET details = ".to_owned() + &det_ex +
" WHERE details->>'task_type' = $1 AND details->>'task_code' = $2"),
&params).await?;
Ok(())
}
pub async fn create_user<'a>(self: &'a Self, session: &'a ListenerSession, user_dat: &'a User) -> DResult<()> {
self.pg_trans()?.execute("INSERT INTO users (\
username, current_session, current_listener, details\
) VALUES ($1, $2, $3, $4)", &[&user_dat.username.to_lowercase(),
&session.session,
&session.listener,
&serde_json::to_value(user_dat)?]).await?;
Ok(())
}
pub async fn save_user_model<'a>(self: &'a Self, details: &'a User)
-> DResult<()> {
self.pg_trans()?
.execute("UPDATE users SET details = $1 WHERE username = $2",
&[&serde_json::to_value(details)?,
&details.username.to_lowercase()]).await?;
Ok(())
}
2022-12-27 16:08:27 +11:00
pub async fn attach_user_to_session<'a>(self: &'a Self, username: &'a str,
session: &'a ListenerSession) -> DResult<()> {
2022-12-27 16:08:27 +11:00
let username_l = username.to_lowercase();
self.pg_trans()?
.execute("INSERT INTO sendqueue (session, listener, message) \
SELECT current_session, current_listener, $1 FROM users \
WHERE username = $2 AND current_session IS NOT NULL \
AND current_listener IS NOT NULL",
&[&"Logged in from another session\r\n", &username_l]).await?;
self.pg_trans()?
2022-12-29 00:43:23 +11:00
.execute("INSERT INTO sendqueue (session, listener, message) \
SELECT current_session, current_listener, null FROM users \
WHERE username = $1 AND current_session IS NOT NULL \
AND current_listener IS NOT NULL",
&[&username_l]).await?;
self.pg_trans()?
2022-12-27 16:08:27 +11:00
.execute("UPDATE users SET current_session = $1, current_listener = $2 WHERE username = $3",
&[&session.session as &(dyn ToSql + Sync), &session.listener, &username_l]).await?;
Ok(())
}
pub async fn find_static_items_by_type<'a>(self: &'a Self, item_type: &'a str) ->
DResult<Box<BTreeSet<String>>> {
Ok(Box::new(
self.pg_trans()?
.query("SELECT DISTINCT details->>'item_code' AS item_code FROM items WHERE \
details->>'is_static' = 'true' AND \
details->>'item_type' = $1", &[&item_type])
.await?
.into_iter()
.map(|v| v.get("item_code"))
.collect()))
}
pub async fn find_static_tasks_by_type<'a>(self: &'a Self, task_type: &'a str) ->
DResult<Box<BTreeSet<String>>> {
Ok(Box::new(
self.pg_trans()?
.query("SELECT DISTINCT details->>'task_code' AS task_code FROM tasks WHERE \
details->>'is_static' = 'true' AND \
details->>'task_type' = $1", &[&task_type])
.await?
.into_iter()
.map(|v| v.get("task_code"))
.collect()))
}
pub async fn delete_static_items_by_code<'a>(self: &'a Self, item_type: &'a str,
item_code: &str) -> DResult<()> {
self.pg_trans()?.query(
"DELETE FROM items WHERE details->>'is_static' = 'true' AND \
2023-01-02 15:39:55 +11:00
details->>'item_type' = $1 AND \
details->>'item_code' = $2",
&[&item_type, &item_code]).await?;
Ok(())
}
pub async fn delete_static_tasks_by_code<'a>(self: &'a Self, task_type: &'a str,
task_code: &'a str) -> DResult<()> {
self.pg_trans()?.query(
"DELETE FROM task WHERE details->>'is_static' = 'true' AND \
details->>'task_type' = $1 AND \
details->>'task_code' = $2",
&[&task_type, &task_code]).await?;
Ok(())
}
pub async fn find_item_by_type_code<'a>(self: &'a Self, item_type: &'a str, item_code: &'a str) ->
DResult<Option<Arc<Item>>> {
if let Some(item) = self.pg_trans()?.query_opt(
"SELECT details FROM items WHERE \
details->>'item_type' = $1 AND \
details->>'item_code' = $2", &[&item_type, &item_code]).await? {
return Ok(Some(Arc::new(serde_json::from_value::<Item>(item.get("details"))?)));
}
Ok(None)
}
2022-12-29 22:17:55 +11:00
pub async fn transfer_all_possessions_code<'a>(self: &'a Self, src_loc: &'a str, dst_loc: &'a str) -> DResult<()> {
self.pg_trans()?.execute(
"UPDATE items SET details=JSONB_SET(details, '{location}', $1) \
WHERE details->>'location' = $2",
&[&serde_json::to_value(dst_loc)?, &src_loc]).await?;
Ok(())
}
pub async fn transfer_all_possessions<'a>(self: &'a Self, source: &'a Item, dest: &'a Item) -> DResult<()> {
let src_loc = format!("{}/{}", &source.item_type, &source.item_code);
let dst_loc = format!("{}/{}", &dest.item_type, &dest.item_code);
self.transfer_all_possessions_code(&src_loc, &dst_loc).await?;
Ok(())
}
pub async fn find_items_by_location<'a>(self: &'a Self, location: &'a str) -> DResult<Vec<Arc<Item>>> {
2022-12-29 22:17:55 +11:00
Ok(self.pg_trans()?.query(
"SELECT details FROM items WHERE details->>'location' = $1 \
ORDER BY details->>'display'
LIMIT 100", &[&location]
2022-12-29 22:17:55 +11:00
).await?.into_iter()
.filter_map(|i| serde_json::from_value(i.get("details")).ok())
.map(Arc::new)
2022-12-29 22:17:55 +11:00
.collect())
}
pub async fn save_item_model(self: &Self, details: &Item)
-> DResult<()> {
self.pg_trans()?
.execute("INSERT INTO items (details) VALUES ($1) \
ON CONFLICT ((details->>'item_type'), \
(details->>'item_code')) DO UPDATE SET \
details = EXCLUDED.details",
&[&serde_json::to_value(details)?]).await?;
Ok(())
}
pub async fn delete_item<'a>(self: &'a Self, item_type: &'a str, item_code: &'a str) -> DResult<()> {
2023-01-23 22:52:01 +11:00
self.pg_trans()?
.execute("DELETE FROM items WHERE \
details->>'item_type' = $1 AND \
details->>'item_code' = $2",
2023-01-23 22:52:01 +11:00
&[&item_type, &item_code]).await?;
Ok(())
}
pub async fn find_session_for_player<'a>(self: &'a Self, item_code: &'a str) -> DResult<Option<(ListenerSession, Session)>> {
2022-12-31 19:33:31 +11:00
Ok(self.pg_trans()?
.query_opt("SELECT u.current_listener, u.current_session, s.details \
FROM users u JOIN sessions s ON s.session = u.current_session \
WHERE u.username=$1", &[&item_code])
.await?
.and_then(|r| match (r.get("current_listener"), r.get("current_session"),
r.get("details")) {
(Some(listener), Some(session), details) =>
Some((ListenerSession { listener, session },
serde_json::from_value(details).ok()?)),
_ => None
}))
}
pub async fn resolve_items_by_display_name_for_player<'l>(
self: &'l Self,
search: &'l ItemSearchParams<'l>
) -> DResult<Arc<Vec<Arc<Item>>>> {
let mut ctes: Vec<String> = Vec::new();
let mut include_tables: Vec<&'static str> = Vec::new();
let player_loc = &search.from_item.location;
let player_desig = format!("{}/{}", search.from_item.item_type,
search.from_item.item_code);
let (offset, query) = parse_offset(search.query);
2023-01-20 23:38:57 +11:00
let mut param_no: usize = 5;
let query_wildcard = query.replace("\\", "\\\\")
.replace("_", "\\_")
.replace("%", "")
.to_lowercase() + "%";
let offset_sql = offset.map(|x| (if x >= 1 { x - 1 } else { x}) as i64).unwrap_or(0);
2023-01-20 23:38:57 +11:00
let query_json = serde_json::to_value(query.to_lowercase())?;
2022-12-31 19:33:31 +11:00
let query_len = query.len() as i32;
let mut params: Vec<&(dyn ToSql + Sync)> = vec!(
&query_wildcard,
2023-01-20 23:38:57 +11:00
&offset_sql, &query_len, &query_json);
if search.include_contents {
ctes.push(format!("contents AS (\
2023-01-20 23:38:57 +11:00
SELECT details, details->'aliases' AS aliases FROM items WHERE details->>'location' = ${}\
)", param_no));
param_no += 1;
params.push(&player_desig);
2023-01-20 23:38:57 +11:00
include_tables.push("SELECT details, aliases FROM contents");
}
if search.include_loc_contents {
ctes.push(format!("loc_contents AS (\
2023-01-20 23:38:57 +11:00
SELECT details, details->'aliases' AS aliases FROM items WHERE details->>'location' = ${}\
)", param_no));
drop(param_no); // or increment if this is a problem.
params.push(&player_loc);
2023-01-20 23:38:57 +11:00
include_tables.push("SELECT details, aliases FROM loc_contents");
}
if search.include_active_players {
ctes.push("active_players AS (\
2023-01-20 23:38:57 +11:00
SELECT details, ('[]'::JSONB) AS aliases FROM items WHERE details->>'item_type' = 'player' \
AND current_session IS NOT NULL \
)".to_owned());
2023-01-20 23:38:57 +11:00
include_tables.push("SELECT details, aliases FROM active_players");
}
if search.include_all_players {
ctes.push("all_players AS (\
2023-01-20 23:38:57 +11:00
SELECT details, ('[]'::JSONB) AS aliases FROM items WHERE details->>'item_type' = 'player'
)".to_owned());
2023-01-20 23:38:57 +11:00
include_tables.push("SELECT details, aliases FROM all_players");
}
ctes.push(format!("relevant_items AS ({})", include_tables.join(" UNION ")));
let cte_str: String = ctes.join(", ");
Ok(Arc::new(self.pg_trans()?.query(
&format!(
2023-01-20 23:38:57 +11:00
"WITH {} SELECT details, aliases FROM relevant_items WHERE (lower(details->>'display') LIKE $1) \
OR (lower(details ->>'display_less_explicit') LIKE $1) \
2023-01-20 23:38:57 +11:00
OR aliases @> $4 \
2022-12-31 19:33:31 +11:00
ORDER BY ABS(length(details->>'display')-$3) ASC \
LIMIT 1 OFFSET $2", &cte_str),
&params
).await?.into_iter()
.filter_map(|i| serde_json::from_value(i.get("details")).ok())
.map(Arc::new)
.collect()))
}
pub async fn get_next_scheduled_task(&self) -> DResult<Option<TaskParse>> {
match self.pg_trans()?.query_opt(
"SELECT details FROM tasks WHERE \
CAST(details->>'next_scheduled' AS TIMESTAMPTZ) <= now() \
ORDER BY details->>'next_scheduled' ASC LIMIT 1", &[]
).await? {
None => Ok(None),
Some(row) => Ok(serde_json::from_value(row.get("details"))?)
}
}
pub async fn delete_task<'a>(&'a self, task_type: &'a str, task_code: &'a str) -> DResult<()> {
self.pg_trans()?.execute(
"DELETE FROM tasks WHERE details->>'task_type' = $1 AND \
details->>'task_code' = $2", &[&task_type, &task_code]
).await?;
Ok(())
}
pub async fn upsert_task<'a>(&'a self, task: &'a Task) -> DResult<()> {
2023-01-02 13:25:05 +11:00
self.pg_trans()?.execute(
"INSERT INTO tasks (details) \
VALUES ($1) \
ON CONFLICT ((details->>'task_code'), (details->>'task_type')) \
DO UPDATE SET details = $1", &[&serde_json::to_value(task)?]).await?;
Ok(())
}
pub async fn update_task<'a>(&'a self, task_type: &'a str, task_code: &'a str, task: &'a TaskParse) -> DResult<()> {
self.pg_trans()?.execute(
"UPDATE tasks SET details = $3 WHERE details->>'task_type' = $1 AND \
details->>'task_code' = $2",
&[&task_type, &task_code, &serde_json::to_value(task)?]
).await?;
Ok(())
}
2023-01-23 22:52:01 +11:00
pub async fn alloc_item_code(&self) -> DResult<i64> {
Ok(self.pg_trans()?.query_one("SELECT NEXTVAL('item_seq')", &[]).await?.get(0))
2023-01-23 22:52:01 +11:00
}
2023-01-28 23:00:53 +11:00
pub async fn get_online_info(&self) ->DResult<Vec<OnlineInfo>> {
Ok(self.pg_trans()?.query(
"SELECT jsonb_build_object(\
'username', u.details->>'username',\
'time', s.details->>'last_active'\
) FROM sessions s \
JOIN users u ON u.current_session = s.session \
ORDER BY s.details->>'last_active' DESC", &[]
).await?
.into_iter()
.filter_map(|row| serde_json::from_value(row.get(0)).ok())
.collect())
}
2023-02-03 23:26:24 +11:00
pub async fn get_location_stats(&self, location: &str) -> DResult<LocationStats> {
Ok(serde_json::from_value(self.pg_trans()?.query_one(
2023-02-19 01:18:08 +11:00
"SELECT JSON_BUILD_OBJECT('total_count', COUNT(*), 'total_weight', COALESCE(SUM(CAST(details->>'weight' AS NUMERIC)), 0)) \
FROM items WHERE details->>'location' = $1", &[&location]
2023-02-03 23:26:24 +11:00
).await?.get(0))?)
}
2023-02-19 01:18:08 +11:00
pub async fn set_exclusive_action_type_to(&self, item: &Item,
new_action_type: &LocationActionType,
other_item_action_type: &LocationActionType) -> DResult<()> {
self.pg_trans()?.execute("UPDATE items SET details=\
JSONB_SET(details, '{action_type}', $1) \
WHERE details->>'location' = $2 AND \
details->>'action_type' = $3",
&[&serde_json::to_value(other_item_action_type)?,
&item.location,
&serde_json::to_value(new_action_type)?
.as_str().unwrap()
]).await?;
self.pg_trans()?.execute("UPDATE items SET details=\
JSONB_SET(details, '{action_type}', $1) \
WHERE details->>'item_type' = $2 AND \
details->>'item_code' = $3",
&[&serde_json::to_value(new_action_type)?,
&item.item_type,
&item.item_code
]).await?;
Ok(())
}
pub async fn find_by_action_and_location(&self, location: &str, action_type: &LocationActionType) -> DResult<Option<Arc<Item>>> {
if let Some(item) = self.pg_trans()?.query_opt(
"SELECT details FROM items WHERE \
details->>'location' = $1 AND \
details->>'action_type' = $2",
&[&location,
&serde_json::to_value(action_type)?.as_str().unwrap()]).await? {
return Ok(Some(Arc::new(serde_json::from_value::<Item>(item.get("details"))?)));
}
Ok(None)
}
2022-12-25 00:25:52 +11:00
pub async fn commit(mut self: Self) -> DResult<()> {
let trans_opt = self.with_trans_mut(|t| std::mem::replace(t, None));
if let Some(trans) = trans_opt {
2022-12-25 00:25:52 +11:00
trans.commit().await?;
}
Ok(())
}
pub fn pg_trans<'a>(self: &'a Self) -> DResult<&'a Transaction<'a>> {
2022-12-25 00:25:52 +11:00
self.borrow_trans().as_ref().ok_or("Transaction already closed".into())
}
}