From 0291c56a822e04002265f36b5c8934345fac5297 Mon Sep 17 00:00:00 2001 From: Shagnor Date: Wed, 14 Dec 2022 23:48:00 +1100 Subject: [PATCH] Get very basic listener to gameserver interop --- Cargo.lock | 2 + blastmud_game/Cargo.toml | 2 + blastmud_game/src/db.rs | 17 +++ blastmud_game/src/listener.rs | 171 +++++++++++++++++++++++++++ blastmud_game/src/main.rs | 13 +- blastmud_game/src/message_handler.rs | 8 ++ blastmud_listener/src/main.rs | 10 +- schema/schema.sql | 53 +++++++++ 8 files changed, 271 insertions(+), 5 deletions(-) create mode 100644 blastmud_game/src/db.rs create mode 100644 blastmud_game/src/listener.rs create mode 100644 blastmud_game/src/message_handler.rs create mode 100644 schema/schema.sql diff --git a/Cargo.lock b/Cargo.lock index 483ac82..cfe491b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,9 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-serde", + "tokio-stream", "tokio-util", + "uuid", ] [[package]] diff --git a/blastmud_game/Cargo.toml b/blastmud_game/Cargo.toml index 9f273d3..85b9a39 100644 --- a/blastmud_game/Cargo.toml +++ b/blastmud_game/Cargo.toml @@ -16,4 +16,6 @@ simple_logger = "4.0.0" tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] } tokio-postgres = { version = "0.7.7", features = ["with-uuid-1"] } tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] } +tokio-stream = "0.1.11" tokio-util = { version = "0.7.4", features = ["codec"] } +uuid = { version = "1.2.2", features = ["v4", "serde", "rng"] } diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs new file mode 100644 index 0000000..cea6287 --- /dev/null +++ b/blastmud_game/src/db.rs @@ -0,0 +1,17 @@ +use tokio_postgres::config::Config as PgConfig; +use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod}; +use std::error::Error; +use std::str::FromStr; +use tokio_postgres::NoTls; + +pub fn start_pool(connstr: &str) -> Result> { + let mgr_config = ManagerConfig { + recycling_method: RecyclingMethod::Fast + }; + let mgr = Manager::from_config( + PgConfig::from_str(connstr)?, + NoTls, mgr_config + ); + + Pool::builder(mgr).max_size(4).build().map_err(|e| Box::new(e) as Box) +} diff --git a/blastmud_game/src/listener.rs b/blastmud_game/src/listener.rs new file mode 100644 index 0000000..9efc754 --- /dev/null +++ b/blastmud_game/src/listener.rs @@ -0,0 +1,171 @@ +use std::error::Error; +use tokio::task; +use tokio::net::{TcpStream, TcpListener}; +use log::{info, warn}; +use tokio_util::codec; +use tokio_util::codec::length_delimited::LengthDelimitedCodec; +use tokio_serde::formats::Cbor; +use blastmud_interfaces::*; +use futures::prelude::*; +use tokio::sync::{Mutex, mpsc, oneshot}; +use std::sync::Arc; +use uuid::Uuid; +use std::collections::BTreeMap; + +pub struct ListenerSend { + message: MessageToListener, + ack_notify: oneshot::Sender<()> +} +pub type ListenerMap = Arc>>>; + +async fn handle_from_listener( + conn: TcpStream, + message_handler: FHandler, + listener_map: ListenerMap) +where + FHandler: Fn(MessageFromListener) -> HandlerFut + Send + 'static, + HandlerFut: Future + Send + 'static { + let mut conn_framed = tokio_serde::Framed::new( + codec::Framed::new(conn, LengthDelimitedCodec::new()), + Cbor::::default() + ); + + let session = match conn_framed.try_next().await { + Ok(Some(MessageFromListener::ListenerPing { uuid })) => uuid, + Ok(Some(msg)) => { + warn!("Got non-ping first message from listener: {:?}", msg); + return; + } + Ok(None) => { + warn!("Lost listener connection before first message"); + return; + } + Err(e) => { + warn!("Lost listener connection to error {} before first message", e); + return; + } + }; + + match conn_framed.send(MessageToListener::AcknowledgeMessage).await { + Ok(_) => {} + Err(e) => { + warn!("Got error sending listener acknowledge for initial ping: {}", e); + return; + } + } + + let (sender, mut receiver) = mpsc::channel(1); + listener_map.lock().await.insert(session, sender); + + 'listener_loop: loop { + tokio::select!( + req = conn_framed.try_next() => { + match req { + Ok(Some(MessageFromListener::AcknowledgeMessage)) => { + warn!("Unexpected acknowledge from listener - bug in listener?"); + } + Ok(Some(msg)) => { + let handle_fut = message_handler(msg); + handle_fut.await; + match conn_framed.send( + MessageToListener::AcknowledgeMessage + ).await { + Ok(_) => {} + Err(e) => { + warn!("Got error sending listener acknowledge: {}", e); + break 'listener_loop; + } + } + } + Ok(None) => { + warn!("Lost connection to listener {} due to end-of-stream", + session); + break 'listener_loop; + } + Err(e) => { + warn!("Lost connection to listener {} due to error {}", + session, e); + break 'listener_loop; + } + } + } + Some(ListenerSend { message, ack_notify }) = receiver.recv() => { + match conn_framed.send(message).await { + Ok(_) => {} + Err(e) => { + warn!("Got error sending listener command: {}", e); + break 'listener_loop; + } + } + // Cut-back loop to wait for acknowledge. + 'ack_wait_loop: loop { + match conn_framed.try_next().await { + Ok(Some(MessageFromListener::AcknowledgeMessage)) => { + ack_notify.send(()).unwrap_or(()); + break 'ack_wait_loop; + } + Ok(Some(msg)) => { + let handle_fut = message_handler(msg); + handle_fut.await; + match conn_framed.send( + MessageToListener::AcknowledgeMessage + ).await { + Ok(_) => {} + Err(e) => { + warn!("Got error sending listener acknowledge: {}", e); + break 'listener_loop; + } + } + } + Ok(None) => { + warn!("Lost connection to listener {} due to end-of-stream", + session); + break 'listener_loop; + } + Err(e) => { + warn!("Lost connection to listener {} due to error {}", + session, e); + break 'listener_loop; + } + } + } + } + ); + } + + listener_map.lock().await.remove(&session); +} + +pub fn make_listener_map() -> ListenerMap { + Arc::new(Mutex::new(BTreeMap::new())) +} + +pub async fn start_listener( + bind_to: String, + listener_map: ListenerMap, + handle_message: FHandler +) -> Result<(), Box> +where + FHandler: Fn(MessageFromListener) -> HandlerFut + Send + Clone + 'static, + HandlerFut: Future + Send + 'static +{ + info!("Starting listener on {}", bind_to); + let listener = TcpListener::bind(bind_to).await?; + + let listener_map_for_task = listener_map.clone(); + task::spawn(async move { + loop { + match listener.accept().await { + Err(e) => { + warn!("Error accepting from listener process: {}", e); + } + Ok((socket, _)) => { + info!("Accepted new inbound connection from listener"); + task::spawn(handle_from_listener(socket, handle_message.clone(), listener_map_for_task.clone())); + } + } + } + }); + + Ok(()) +} diff --git a/blastmud_game/src/main.rs b/blastmud_game/src/main.rs index ca71562..13e63cc 100644 --- a/blastmud_game/src/main.rs +++ b/blastmud_game/src/main.rs @@ -3,9 +3,11 @@ use std::fs; use std::error::Error; use log::{info, LevelFilter}; use simple_logger::SimpleLogger; +use tokio::signal::unix::{signal, SignalKind}; mod db; mod listener; +mod message_handler; #[derive(Deserialize, Debug)] struct Config { @@ -31,7 +33,16 @@ async fn main() -> Result<(), Box> { info!("Database pool initialised: {:?}", pool.status()); - let listener = listener::start_listener(config.listener); + let listener_map = listener::make_listener_map(); + listener::start_listener(config.listener, listener_map.clone(), + move |msg| { + message_handler::handle(msg, pool.clone(), listener_map.clone()) + } + ).await?; + + + let mut sigusr1 = signal(SignalKind::user_defined1())?; + sigusr1.recv().await; Ok(()) } diff --git a/blastmud_game/src/message_handler.rs b/blastmud_game/src/message_handler.rs new file mode 100644 index 0000000..b5b3e8b --- /dev/null +++ b/blastmud_game/src/message_handler.rs @@ -0,0 +1,8 @@ +use log::info; +use blastmud_interfaces::*; +use deadpool_postgres::Pool; +use crate::listener::ListenerMap; + +pub async fn handle(msg: MessageFromListener, _pool: Pool, _listener_map: ListenerMap) { + info!("Processing message: {:?}", msg) +} diff --git a/blastmud_listener/src/main.rs b/blastmud_listener/src/main.rs index f90fb77..f57298c 100644 --- a/blastmud_listener/src/main.rs +++ b/blastmud_listener/src/main.rs @@ -99,9 +99,13 @@ where ); break 'full_select; } + Ok(Some(MessageToListener::AcknowledgeMessage)) => { + // We do this here to ensure we never ack an ack. + warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver"); + } Ok(Some(msg)) => { message_handler(msg); - } + } } match conn_framed.send(MessageFromListener::AcknowledgeMessage).await { @@ -208,9 +212,7 @@ type SessionMap = Arc>>; async fn handle_server_message(session_map: SessionMap, message: MessageToListener) { match message { - MessageToListener::AcknowledgeMessage => { - warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver"); - } + MessageToListener::AcknowledgeMessage => {} MessageToListener::DisconnectSession { session } => { match session_map.lock().await.get(&session) { // Just silently ignore it if they are disconnected. diff --git a/schema/schema.sql b/schema/schema.sql new file mode 100644 index 0000000..391d3f9 --- /dev/null +++ b/schema/schema.sql @@ -0,0 +1,53 @@ +-- Note database created is ephemeral and use for migra to diff only. +-- Never put data in it, or it will be lost. +DROP DATABASE IF EXISTS blast_schemaonly; +CREATE DATABASE blast_schemaonly; + +\c blast_schemaonly + +CREATE TABLE listeners ( + listener UUID NOT NULL PRIMARY KEY, + last_seen TIMESTAMP WITH TIME ZONE +); + +CREATE TABLE sessions ( + session UUID NOT NULL PRIMARY KEY, + listener UUID NOT NULL, + details JSONB NOT NULL +); +CREATE INDEX session_by_listener ON sessions(listener); + +CREATE TABLE items ( + item_id BIGINT NOT NULL PRIMARY KEY, + item_code TEXT NOT NULL, + item_type TEXT NOT NULL, + location BIGINT REFERENCES items(item_id), + details JSONB NOT NULL, + UNIQUE (item_code, item_type) +); +CREATE INDEX item_index ON items (item_code, item_type); +CREATE INDEX item_by_loc ON items (location); + +CREATE TABLE users ( + username TEXT NOT NULL PRIMARY KEY, + current_session UUID REFERENCES sessions(session), + current_listener UUID REFERENCES listeners(listener), + details JSONB NOT NULL +); +CREATE INDEX user_by_listener ON users(current_listener); + +CREATE UNLOGGED TABLE sendqueue ( + item BIGINT NOT NULL PRIMARY KEY, + session UUID NOT NULL REFERENCES sessions(session), + listener UUID REFERENCES listeners(listener), + message TEXT NOT NULL +); + +CREATE TABLE tasks ( + task_code TEXT NOT NULL, + task_type TEXT NOT NULL, + next_scheduled TIMESTAMP WITH TIME ZONE NOT NULL, + details JSONB NOT NULL, + PRIMARY KEY (task_code, task_type) +); +CREATE INDEX task_by_next_scheduled ON tasks(next_scheduled);