Implement DB based sendqueue

This commit is contained in:
Condorra 2022-12-23 23:31:49 +11:00
parent a81bd9c52b
commit 8aa4d59ba6
6 changed files with 126 additions and 46 deletions

View File

@ -1,10 +1,11 @@
use tokio_postgres::config::Config as PgConfig; use tokio_postgres::{config::Config as PgConfig, row::Row};
use deadpool_postgres::{Manager, Object, ManagerConfig, Pool, use deadpool_postgres::{Manager, Object, ManagerConfig, Pool,
RecyclingMethod}; RecyclingMethod};
use std::error::Error; use std::error::Error;
use std::str::FromStr; use std::str::FromStr;
use uuid::Uuid; use uuid::Uuid;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use crate::message_handler::ListenerSession;
use crate::DResult; use crate::DResult;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -12,6 +13,25 @@ pub struct DBPool {
pool: Pool pool: Pool
} }
#[derive(Clone, Debug)]
pub struct SendqueueItem {
pub item: i64,
pub session: ListenerSession,
pub message: String
}
impl From<Row> for SendqueueItem {
fn from(row: Row) -> Self {
SendqueueItem {
item: row.get("item"),
session: ListenerSession {
session: row.get("session"),
listener: row.get("listener")
},
message: row.get("message")
}
}
}
impl DBPool { impl DBPool {
pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> { pub async fn record_listener_ping(self: DBPool, listener: Uuid) -> DResult<()> {
self.get_conn().await?.execute( self.get_conn().await?.execute(
@ -22,14 +42,14 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn get_dead_listeners(self: DBPool) -> DResult<Vec<Uuid>> { pub async fn get_dead_listeners(self: Self) -> DResult<Vec<Uuid>> {
Ok(self.get_conn().await? Ok(self.get_conn().await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL '2 minutes'", &[]) INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect()) .await?.into_iter().map(|r| r.get(0)).collect())
} }
pub async fn cleanup_listener(self: DBPool, listener: Uuid) -> DResult<()> { pub async fn cleanup_listener(self: Self, listener: Uuid) -> DResult<()> {
let mut conn = self.get_conn().await?; let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?; let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \ tx.execute("UPDATE users SET current_session = NULL, \
@ -45,13 +65,50 @@ impl DBPool {
Ok(()) Ok(())
} }
pub async fn start_session(self: DBPool, listener: Uuid, session: Uuid) -> DResult<()> { pub async fn start_session(self: Self, session: ListenerSession) -> DResult<()> {
self.get_conn().await?.execute( self.get_conn().await?.execute(
"INSERT INTO sessions (session, listener, details) VALUES ($1, $2, '{}')", "INSERT INTO sessions (session, listener, details) VALUES ($1, $2, '{}')",
&[&session, &listener] &[&session.session, &session.listener]
).await?; ).await?;
Ok(()) Ok(())
} }
pub async fn end_session(self: Self, session: ListenerSession) -> DResult<()> {
let mut conn = self.get_conn().await?;
let tx = conn.transaction().await?;
tx.execute("UPDATE users SET current_session = NULL, \
current_listener = NULL WHERE current_session = $1",
&[&session.session]).await?;
tx.execute("DELETE FROM sendqueue WHERE session = $1",
&[&session.session]).await?;
tx.execute("DELETE FROM sessions WHERE session = $1",
&[&session.session]).await?;
tx.commit().await?;
Ok(())
}
pub async fn queue_for_session(self: Self,
session: &ListenerSession,
message: &str) -> DResult<()> {
let conn = self.get_conn().await?;
conn.execute("INSERT INTO sendqueue (session, listener, message) VALUES ($1, $2, $3)",
&[&session.session, &session.listener, &message]).await?;
Ok(())
}
pub async fn get_from_sendqueue(self: Self) -> DResult<Vec<SendqueueItem>> {
let conn = self.get_conn().await?;
Ok(conn.query("SELECT item, session, listener, message FROM sendqueue ORDER BY item ASC LIMIT 10",
&[])
.await?.into_iter().map(SendqueueItem::from).collect())
}
pub async fn delete_from_sendqueue(self: DBPool, item: &SendqueueItem) -> DResult<()> {
let conn = self.get_conn().await?;
conn.execute("DELETE FROM sendqueue WHERE item=$1", &[&item.item]).await?;
Ok(())
}
pub async fn get_conn(self: DBPool) -> pub async fn get_conn(self: DBPool) ->
DResult<Object> { DResult<Object> {
@ -59,7 +116,7 @@ impl DBPool {
conn.execute("SET synchronous_commit=off", &[]).await?; conn.execute("SET synchronous_commit=off", &[]).await?;
Ok(conn) Ok(conn)
} }
pub fn start(connstr: &str) -> DResult<DBPool> { pub fn start(connstr: &str) -> DResult<DBPool> {
let mgr_config = ManagerConfig { let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast recycling_method: RecyclingMethod::Fast
@ -72,6 +129,6 @@ impl DBPool {
Pool::builder(mgr).max_size(4).build() Pool::builder(mgr).max_size(4).build()
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>) .map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
.map(|pool| DBPool { pool }) .map(|pool| Self { pool })
} }
} }

View File

@ -19,7 +19,7 @@ pub struct ListenerSend {
pub ack_notify: oneshot::Sender<()> pub ack_notify: oneshot::Sender<()>
} }
pub type ListenerMap = Arc<Mutex<BTreeMap<Uuid, mpsc::Sender<ListenerSend>>>>; pub type ListenerMap = Arc<Mutex<BTreeMap<Uuid, mpsc::Sender<ListenerSend>>>>;
async fn handle_from_listener<FHandler, HandlerFut>( async fn handle_from_listener<FHandler, HandlerFut>(
conn: TcpStream, conn: TcpStream,
message_handler: FHandler, message_handler: FHandler,

View File

@ -48,12 +48,12 @@ async fn main() -> DResult<()> {
let mh_pool = pool.clone(); let mh_pool = pool.clone();
listener::start_listener(config.listener, listener_map.clone(), listener::start_listener(config.listener, listener_map.clone(),
move |listener_id, msg| { move |listener_id, msg| {
message_handler::handle(listener_id, msg, mh_pool.clone(), listener_map.clone()) message_handler::handle(listener_id, msg, mh_pool.clone())
} }
).await?; ).await?;
version_cutover::replace_old_gameserver(&config.pidfile)?; version_cutover::replace_old_gameserver(&config.pidfile)?;
regular_tasks::start_regular_tasks(pool.clone())?; regular_tasks::start_regular_tasks(pool.clone(), listener_map)?;
let mut sigusr1 = signal(SignalKind::user_defined1())?; let mut sigusr1 = signal(SignalKind::user_defined1())?;
sigusr1.recv().await; sigusr1.recv().await;

View File

@ -1,45 +1,28 @@
use blastmud_interfaces::*; use blastmud_interfaces::*;
use crate::listener::ListenerMap;
use crate::db; use crate::db;
use MessageFromListener::*; use MessageFromListener::*;
use uuid::Uuid; use uuid::Uuid;
use tokio::{sync::oneshot, task};
use crate::listener::ListenerSend;
use crate::DResult; use crate::DResult;
use log::info;
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap) #[derive(Clone,Debug)]
pub struct ListenerSession {
pub listener: Uuid,
pub session: Uuid
}
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool)
-> DResult<()> { -> DResult<()> {
match msg { match msg {
ListenerPing { .. } => { pool.record_listener_ping(listener).await?; } ListenerPing { .. } => { pool.record_listener_ping(listener).await?; }
SessionConnected { session, source: _ } => { SessionConnected { session, source: _ } => {
pool.start_session(listener, session).await?; pool.start_session(ListenerSession { listener, session }).await?;
}
SessionDisconnected { session } => {
pool.end_session(ListenerSession { listener, session }).await?;
} }
SessionDisconnected { session: _ } => {}
SessionSentLine { session, msg } => { SessionSentLine { session, msg } => {
info!("Awaiting listener lock"); pool.queue_for_session(&ListenerSession { listener, session },
let lmlock = listener_map.lock().await; &format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg)).await?;
let opt_sender = lmlock.get(&listener).map(|v| v.clone());
drop(lmlock);
info!("Listener lock dropped");
match opt_sender {
None => {}
Some(sender) => {
info!("Spawning message task");
task::spawn(async move {
let (tx, rx) = oneshot::channel();
info!("Sending echo");
sender.send(ListenerSend { message: MessageToListener::SendToSession {
session,
msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) },
ack_notify: tx }).await.unwrap_or(());
info!("Awaiting echo ack");
rx.await.unwrap_or(());
info!("Echo ack received");
});
info!("Message task spawned");
}
}
} }
AcknowledgeMessage => {} AcknowledgeMessage => {}
} }

View File

@ -1,6 +1,8 @@
use tokio::{task, time}; use tokio::{task, time, sync::oneshot};
use crate::DResult; use crate::DResult;
use crate::db; use crate::db;
use crate::listener::{ListenerMap, ListenerSend};
use blastmud_interfaces::MessageToListener;
use log::warn; use log::warn;
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
@ -10,21 +12,59 @@ async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
Ok(()) Ok(())
} }
fn start_session_cleanup_task(pool: db::DBPool) -> DResult<()> { fn start_session_cleanup_task(pool: db::DBPool) {
task::spawn(async move { task::spawn(async move {
loop { loop {
time::sleep(time::Duration::from_secs(60)).await;
match cleanup_session_once(pool.clone()).await { match cleanup_session_once(pool.clone()).await {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
warn!("Error cleaning up sessions: {}", e); warn!("Error cleaning up sessions: {}", e);
time::sleep(time::Duration::from_secs(1)).await;
} }
} }
} }
}); });
}
async fn process_sendqueue_once(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> {
for item in pool.clone().get_from_sendqueue().await? {
match listener_map.lock().await.get(&item.session.listener).map(|l| l.clone()) {
None => {}
Some(listener_sender) => {
let (tx, rx) = oneshot::channel();
listener_sender.send(
ListenerSend {
message: MessageToListener::SendToSession {
session: item.session.session.clone(),
msg: item.message.clone()
},
ack_notify: tx
}
).await.unwrap_or(());
rx.await.unwrap_or(());
pool.clone().delete_from_sendqueue(&item).await?;
}
}
}
Ok(()) Ok(())
} }
pub fn start_regular_tasks(pool: db::DBPool) -> DResult<()> { fn start_send_queue_task(pool: db::DBPool, listener_map: ListenerMap) {
start_session_cleanup_task(pool) task::spawn(async move {
loop {
time::sleep(time::Duration::from_secs(1)).await;
match process_sendqueue_once(pool.clone(), listener_map.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error processing sendqueue: {}", e);
}
}
}
});
}
pub fn start_regular_tasks(pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> {
start_session_cleanup_task(pool.clone());
start_send_queue_task(pool, listener_map);
Ok(())
} }

View File

@ -37,7 +37,7 @@ CREATE TABLE users (
CREATE INDEX user_by_listener ON users(current_listener); CREATE INDEX user_by_listener ON users(current_listener);
CREATE UNLOGGED TABLE sendqueue ( CREATE UNLOGGED TABLE sendqueue (
item BIGINT NOT NULL PRIMARY KEY, item BIGSERIAL NOT NULL PRIMARY KEY,
session UUID NOT NULL REFERENCES sessions(session), session UUID NOT NULL REFERENCES sessions(session),
listener UUID REFERENCES listeners(listener), listener UUID REFERENCES listeners(listener),
message TEXT NOT NULL message TEXT NOT NULL