Refactor db.rs and start saving sessions

This commit is contained in:
Condorra 2022-12-23 21:37:28 +11:00
parent 6d05573fad
commit a81bd9c52b
4 changed files with 68 additions and 55 deletions

View File

@ -12,56 +12,66 @@ pub struct DBPool {
pool: Pool
}
pub async fn record_listener_ping(listener: Uuid, pool: DBPool) -> DResult<()> {
get_conn(pool).await?.execute(
"INSERT INTO listeners (listener, last_seen) \
VALUES ($1, NOW()) \
ON CONFLICT (listener) \
DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener]).await?;
Ok(())
}
impl DBPool {
pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> {
self.get_conn().await?.execute(
"INSERT INTO listeners (listener, last_seen) \
VALUES ($1, NOW()) \
ON CONFLICT (listener) \
DO UPDATE SET last_seen = EXCLUDED.last_seen", &[&listener]).await?;
Ok(())
}
pub async fn get_dead_listeners(pool: DBPool) -> DResult<Vec<Uuid>> {
Ok(get_conn(pool).await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect())
}
pub async fn get_dead_listeners(self: DBPool) -> DResult<Vec<Uuid>> {
Ok(self.get_conn().await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect())
}
pub async fn cleanup_listener(pool: DBPool, listener: Uuid) -> DResult<()> {
let mut conn = get_conn(pool).await?;
let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \
current_listener = NULL WHERE current_listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sendqueue WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sessions WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM listeners WHERE listener = $1",
&[&listener]).await?;
tx.commit().await?;
Ok(())
}
pub async fn cleanup_listener(self: DBPool, listener: Uuid) -> DResult<()> {
let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \
current_listener = NULL WHERE current_listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sendqueue WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM sessions WHERE listener = $1",
&[&listener]).await?;
tx.execute("DELETE FROM listeners WHERE listener = $1",
&[&listener]).await?;
tx.commit().await?;
Ok(())
}
pub async fn get_conn(DBPool { pool }: DBPool) ->
DResult<Object> {
let conn = pool.get().await?;
conn.execute("SET synchronous_commit=off", &[]).await?;
Ok(conn)
}
pub fn start_pool(connstr: &str) -> DResult<DBPool> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast
};
let mgr = Manager::from_config(
PgConfig::from_str(connstr)
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?,
NoTls, mgr_config
);
Pool::builder(mgr).max_size(4).build()
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
.map(|pool| DBPool { pool })
pub async fn start_session(self: DBPool, listener: Uuid, session: Uuid) -> DResult<()> {
self.get_conn().await?.execute(
"INSERT INTO sessions (session, listener, details) VALUES ($1, $2, '{}')",
&[&session, &listener]
).await?;
Ok(())
}
pub async fn get_conn(self: DBPool) ->
DResult<Object> {
let conn = self.pool.get().await?;
conn.execute("SET synchronous_commit=off", &[]).await?;
Ok(conn)
}
pub fn start(connstr: &str) -> DResult<DBPool> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast
};
let mgr = Manager::from_config(
PgConfig::from_str(connstr)
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?,
NoTls, mgr_config
);
Pool::builder(mgr).max_size(4).build()
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
.map(|pool| DBPool { pool })
}
}

View File

@ -4,6 +4,7 @@ use std::error::Error;
use log::{info, error, LevelFilter};
use simple_logger::SimpleLogger;
use tokio::signal::unix::{signal, SignalKind};
use db::DBPool;
mod db;
mod listener;
@ -35,10 +36,10 @@ async fn main() -> DResult<()> {
Err(e)
})?;
let config = read_latest_config()?;
let pool = db::start_pool(&config.database_conn_string)?;
let pool = DBPool::start(&config.database_conn_string)?;
// Test the database connection string works so we quit early if not...
let _ = db::get_conn(pool.clone()).await?.query("SELECT 1", &[]).await?;
let _ = pool.clone().get_conn().await?.query("SELECT 1", &[]).await?;
info!("Database pool initialised");

View File

@ -11,8 +11,10 @@ use log::info;
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap)
-> DResult<()> {
match msg {
ListenerPing { .. } => { db::record_listener_ping(listener, pool).await?; }
SessionConnected { session: _, source: _ } => {}
ListenerPing { .. } => { pool.record_listener_ping(listener).await?; }
SessionConnected { session, source: _ } => {
pool.start_session(listener, session).await?;
}
SessionDisconnected { session: _ } => {}
SessionSentLine { session, msg } => {
info!("Awaiting listener lock");

View File

@ -4,8 +4,8 @@ use crate::db;
use log::warn;
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
for listener in db::get_dead_listeners(pool.clone()).await? {
db::cleanup_listener(pool.clone(), listener).await?;
for listener in pool.clone().get_dead_listeners().await? {
pool.clone().cleanup_listener(listener).await?;
}
Ok(())
}