forked from blasthavers/blastmud
Get very basic listener to gameserver interop
This commit is contained in:
parent
08b24b2fed
commit
0291c56a82
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -56,7 +56,9 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
"tokio-serde",
|
"tokio-serde",
|
||||||
|
"tokio-stream",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -16,4 +16,6 @@ simple_logger = "4.0.0"
|
|||||||
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
|
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
|
||||||
tokio-postgres = { version = "0.7.7", features = ["with-uuid-1"] }
|
tokio-postgres = { version = "0.7.7", features = ["with-uuid-1"] }
|
||||||
tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] }
|
tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] }
|
||||||
|
tokio-stream = "0.1.11"
|
||||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||||
|
uuid = { version = "1.2.2", features = ["v4", "serde", "rng"] }
|
||||||
|
17
blastmud_game/src/db.rs
Normal file
17
blastmud_game/src/db.rs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
use tokio_postgres::config::Config as PgConfig;
|
||||||
|
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use tokio_postgres::NoTls;
|
||||||
|
|
||||||
|
pub fn start_pool(connstr: &str) -> Result<Pool, Box<dyn Error>> {
|
||||||
|
let mgr_config = ManagerConfig {
|
||||||
|
recycling_method: RecyclingMethod::Fast
|
||||||
|
};
|
||||||
|
let mgr = Manager::from_config(
|
||||||
|
PgConfig::from_str(connstr)?,
|
||||||
|
NoTls, mgr_config
|
||||||
|
);
|
||||||
|
|
||||||
|
Pool::builder(mgr).max_size(4).build().map_err(|e| Box::new(e) as Box<dyn Error>)
|
||||||
|
}
|
171
blastmud_game/src/listener.rs
Normal file
171
blastmud_game/src/listener.rs
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
use std::error::Error;
|
||||||
|
use tokio::task;
|
||||||
|
use tokio::net::{TcpStream, TcpListener};
|
||||||
|
use log::{info, warn};
|
||||||
|
use tokio_util::codec;
|
||||||
|
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
|
||||||
|
use tokio_serde::formats::Cbor;
|
||||||
|
use blastmud_interfaces::*;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use tokio::sync::{Mutex, mpsc, oneshot};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
pub struct ListenerSend {
|
||||||
|
message: MessageToListener,
|
||||||
|
ack_notify: oneshot::Sender<()>
|
||||||
|
}
|
||||||
|
pub type ListenerMap = Arc<Mutex<BTreeMap<Uuid, mpsc::Sender<ListenerSend>>>>;
|
||||||
|
|
||||||
|
async fn handle_from_listener<FHandler, HandlerFut>(
|
||||||
|
conn: TcpStream,
|
||||||
|
message_handler: FHandler,
|
||||||
|
listener_map: ListenerMap)
|
||||||
|
where
|
||||||
|
FHandler: Fn(MessageFromListener) -> HandlerFut + Send + 'static,
|
||||||
|
HandlerFut: Future<Output = ()> + Send + 'static {
|
||||||
|
let mut conn_framed = tokio_serde::Framed::new(
|
||||||
|
codec::Framed::new(conn, LengthDelimitedCodec::new()),
|
||||||
|
Cbor::<MessageFromListener, MessageToListener>::default()
|
||||||
|
);
|
||||||
|
|
||||||
|
let session = match conn_framed.try_next().await {
|
||||||
|
Ok(Some(MessageFromListener::ListenerPing { uuid })) => uuid,
|
||||||
|
Ok(Some(msg)) => {
|
||||||
|
warn!("Got non-ping first message from listener: {:?}", msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("Lost listener connection before first message");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Lost listener connection to error {} before first message", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match conn_framed.send(MessageToListener::AcknowledgeMessage).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Got error sending listener acknowledge for initial ping: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (sender, mut receiver) = mpsc::channel(1);
|
||||||
|
listener_map.lock().await.insert(session, sender);
|
||||||
|
|
||||||
|
'listener_loop: loop {
|
||||||
|
tokio::select!(
|
||||||
|
req = conn_framed.try_next() => {
|
||||||
|
match req {
|
||||||
|
Ok(Some(MessageFromListener::AcknowledgeMessage)) => {
|
||||||
|
warn!("Unexpected acknowledge from listener - bug in listener?");
|
||||||
|
}
|
||||||
|
Ok(Some(msg)) => {
|
||||||
|
let handle_fut = message_handler(msg);
|
||||||
|
handle_fut.await;
|
||||||
|
match conn_framed.send(
|
||||||
|
MessageToListener::AcknowledgeMessage
|
||||||
|
).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Got error sending listener acknowledge: {}", e);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("Lost connection to listener {} due to end-of-stream",
|
||||||
|
session);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Lost connection to listener {} due to error {}",
|
||||||
|
session, e);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(ListenerSend { message, ack_notify }) = receiver.recv() => {
|
||||||
|
match conn_framed.send(message).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Got error sending listener command: {}", e);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Cut-back loop to wait for acknowledge.
|
||||||
|
'ack_wait_loop: loop {
|
||||||
|
match conn_framed.try_next().await {
|
||||||
|
Ok(Some(MessageFromListener::AcknowledgeMessage)) => {
|
||||||
|
ack_notify.send(()).unwrap_or(());
|
||||||
|
break 'ack_wait_loop;
|
||||||
|
}
|
||||||
|
Ok(Some(msg)) => {
|
||||||
|
let handle_fut = message_handler(msg);
|
||||||
|
handle_fut.await;
|
||||||
|
match conn_framed.send(
|
||||||
|
MessageToListener::AcknowledgeMessage
|
||||||
|
).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Got error sending listener acknowledge: {}", e);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("Lost connection to listener {} due to end-of-stream",
|
||||||
|
session);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Lost connection to listener {} due to error {}",
|
||||||
|
session, e);
|
||||||
|
break 'listener_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
listener_map.lock().await.remove(&session);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn make_listener_map() -> ListenerMap {
|
||||||
|
Arc::new(Mutex::new(BTreeMap::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_listener<FHandler, HandlerFut>(
|
||||||
|
bind_to: String,
|
||||||
|
listener_map: ListenerMap,
|
||||||
|
handle_message: FHandler
|
||||||
|
) -> Result<(), Box<dyn Error>>
|
||||||
|
where
|
||||||
|
FHandler: Fn(MessageFromListener) -> HandlerFut + Send + Clone + 'static,
|
||||||
|
HandlerFut: Future<Output = ()> + Send + 'static
|
||||||
|
{
|
||||||
|
info!("Starting listener on {}", bind_to);
|
||||||
|
let listener = TcpListener::bind(bind_to).await?;
|
||||||
|
|
||||||
|
let listener_map_for_task = listener_map.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error accepting from listener process: {}", e);
|
||||||
|
}
|
||||||
|
Ok((socket, _)) => {
|
||||||
|
info!("Accepted new inbound connection from listener");
|
||||||
|
task::spawn(handle_from_listener(socket, handle_message.clone(), listener_map_for_task.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
@ -3,9 +3,11 @@ use std::fs;
|
|||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use log::{info, LevelFilter};
|
use log::{info, LevelFilter};
|
||||||
use simple_logger::SimpleLogger;
|
use simple_logger::SimpleLogger;
|
||||||
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
|
|
||||||
mod db;
|
mod db;
|
||||||
mod listener;
|
mod listener;
|
||||||
|
mod message_handler;
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
struct Config {
|
struct Config {
|
||||||
@ -31,7 +33,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
info!("Database pool initialised: {:?}", pool.status());
|
info!("Database pool initialised: {:?}", pool.status());
|
||||||
|
|
||||||
let listener = listener::start_listener(config.listener);
|
let listener_map = listener::make_listener_map();
|
||||||
|
listener::start_listener(config.listener, listener_map.clone(),
|
||||||
|
move |msg| {
|
||||||
|
message_handler::handle(msg, pool.clone(), listener_map.clone())
|
||||||
|
}
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
|
||||||
|
let mut sigusr1 = signal(SignalKind::user_defined1())?;
|
||||||
|
sigusr1.recv().await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
8
blastmud_game/src/message_handler.rs
Normal file
8
blastmud_game/src/message_handler.rs
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
use log::info;
|
||||||
|
use blastmud_interfaces::*;
|
||||||
|
use deadpool_postgres::Pool;
|
||||||
|
use crate::listener::ListenerMap;
|
||||||
|
|
||||||
|
pub async fn handle(msg: MessageFromListener, _pool: Pool, _listener_map: ListenerMap) {
|
||||||
|
info!("Processing message: {:?}", msg)
|
||||||
|
}
|
@ -99,6 +99,10 @@ where
|
|||||||
);
|
);
|
||||||
break 'full_select;
|
break 'full_select;
|
||||||
}
|
}
|
||||||
|
Ok(Some(MessageToListener::AcknowledgeMessage)) => {
|
||||||
|
// We do this here to ensure we never ack an ack.
|
||||||
|
warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver");
|
||||||
|
}
|
||||||
Ok(Some(msg)) => {
|
Ok(Some(msg)) => {
|
||||||
message_handler(msg);
|
message_handler(msg);
|
||||||
}
|
}
|
||||||
@ -208,9 +212,7 @@ type SessionMap = Arc<Mutex<BTreeMap<Uuid, SessionRecord>>>;
|
|||||||
|
|
||||||
async fn handle_server_message(session_map: SessionMap, message: MessageToListener) {
|
async fn handle_server_message(session_map: SessionMap, message: MessageToListener) {
|
||||||
match message {
|
match message {
|
||||||
MessageToListener::AcknowledgeMessage => {
|
MessageToListener::AcknowledgeMessage => {}
|
||||||
warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver");
|
|
||||||
}
|
|
||||||
MessageToListener::DisconnectSession { session } => {
|
MessageToListener::DisconnectSession { session } => {
|
||||||
match session_map.lock().await.get(&session) {
|
match session_map.lock().await.get(&session) {
|
||||||
// Just silently ignore it if they are disconnected.
|
// Just silently ignore it if they are disconnected.
|
||||||
|
53
schema/schema.sql
Normal file
53
schema/schema.sql
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
-- Note database created is ephemeral and use for migra to diff only.
|
||||||
|
-- Never put data in it, or it will be lost.
|
||||||
|
DROP DATABASE IF EXISTS blast_schemaonly;
|
||||||
|
CREATE DATABASE blast_schemaonly;
|
||||||
|
|
||||||
|
\c blast_schemaonly
|
||||||
|
|
||||||
|
CREATE TABLE listeners (
|
||||||
|
listener UUID NOT NULL PRIMARY KEY,
|
||||||
|
last_seen TIMESTAMP WITH TIME ZONE
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE sessions (
|
||||||
|
session UUID NOT NULL PRIMARY KEY,
|
||||||
|
listener UUID NOT NULL,
|
||||||
|
details JSONB NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX session_by_listener ON sessions(listener);
|
||||||
|
|
||||||
|
CREATE TABLE items (
|
||||||
|
item_id BIGINT NOT NULL PRIMARY KEY,
|
||||||
|
item_code TEXT NOT NULL,
|
||||||
|
item_type TEXT NOT NULL,
|
||||||
|
location BIGINT REFERENCES items(item_id),
|
||||||
|
details JSONB NOT NULL,
|
||||||
|
UNIQUE (item_code, item_type)
|
||||||
|
);
|
||||||
|
CREATE INDEX item_index ON items (item_code, item_type);
|
||||||
|
CREATE INDEX item_by_loc ON items (location);
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
username TEXT NOT NULL PRIMARY KEY,
|
||||||
|
current_session UUID REFERENCES sessions(session),
|
||||||
|
current_listener UUID REFERENCES listeners(listener),
|
||||||
|
details JSONB NOT NULL
|
||||||
|
);
|
||||||
|
CREATE INDEX user_by_listener ON users(current_listener);
|
||||||
|
|
||||||
|
CREATE UNLOGGED TABLE sendqueue (
|
||||||
|
item BIGINT NOT NULL PRIMARY KEY,
|
||||||
|
session UUID NOT NULL REFERENCES sessions(session),
|
||||||
|
listener UUID REFERENCES listeners(listener),
|
||||||
|
message TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE tasks (
|
||||||
|
task_code TEXT NOT NULL,
|
||||||
|
task_type TEXT NOT NULL,
|
||||||
|
next_scheduled TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||||
|
details JSONB NOT NULL,
|
||||||
|
PRIMARY KEY (task_code, task_type)
|
||||||
|
);
|
||||||
|
CREATE INDEX task_by_next_scheduled ON tasks(next_scheduled);
|
Loading…
Reference in New Issue
Block a user