Get basic server IO structure coded

This commit is contained in:
Condorra 2022-12-12 23:35:55 +11:00
parent 3d0b5281fa
commit 1348525283
5 changed files with 471 additions and 80 deletions

283
Cargo.lock generated
View File

@ -35,18 +35,27 @@ dependencies = [
]
[[package]]
name = "blastmud_runner"
name = "blastmud_listener"
version = "0.1.0"
dependencies = [
"blastmud_interfaces",
"futures",
"log",
"rand",
"serde",
"serde_yaml",
"simple_logger",
"tokio",
"tokio-serde",
"tokio-util",
]
[[package]]
name = "bytes"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -64,6 +73,121 @@ dependencies = [
"winapi",
]
[[package]]
name = "educe"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0188e3c3ba8df5753894d54461f0e39bc91741dc5b22e1c46999ec2c71f4e4"
dependencies = [
"enum-ordinalize",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "enum-ordinalize"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
dependencies = [
"num-bigint",
"num-traits",
"proc-macro2",
"quote",
"rustc_version",
"syn",
]
[[package]]
name = "futures"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
[[package]]
name = "futures-executor"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
[[package]]
name = "futures-task"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
[[package]]
name = "futures-util"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.8"
@ -75,6 +199,12 @@ dependencies = [
"wasi",
]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -127,6 +257,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mio"
version = "0.8.5"
@ -139,6 +275,36 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.14.0"
@ -158,12 +324,44 @@ dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -218,12 +416,27 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "semver"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "serde"
version = "1.0.149"
@ -233,6 +446,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.149"
@ -279,6 +502,15 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]]
name = "socket2"
version = "0.4.7"
@ -357,6 +589,55 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-serde"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466"
dependencies = [
"bytes",
"educe",
"futures-core",
"futures-sink",
"pin-project",
"serde",
"serde_cbor",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "tracing"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
]
[[package]]
name = "unicode-ident"
version = "1.0.5"

View File

@ -1,16 +1,18 @@
use uuid::Uuid;
use serde::*;
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum MessageFromListener {
ListenerPing { uuid: Uuid },
UserConnected { user: Uuid, source: String },
UserDisconnected { user: Uuid },
UserSentLine { user: Uuid, msg: String },
AcknowledgeMessage
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum MessageToListener {
DisconnectUser { user: Uuid },
SendToUser { user: Uuid, msg: String },
AcknowledgeMessage
}

View File

@ -1,5 +1,5 @@
[package]
name = "blastmud_runner"
name = "blastmud_listener"
version = "0.1.0"
edition = "2021"
@ -7,9 +7,12 @@ edition = "2021"
[dependencies]
blastmud_interfaces = { path = "../blastmud_interfaces" }
futures = "0.3.25"
log = "0.4.17"
rand = "0.8.5"
serde = { version = "1.0.149", features = ["derive", "serde_derive"] }
serde_yaml = "0.9.14"
simple_logger = "4.0.0"
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time"] }
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync"] }
tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] }
tokio-util = { version = "0.7.4", features = ["codec"] }

View File

@ -3,19 +3,23 @@ use std::error::Error;
use std::fs;
use serde::*;
use tokio::task;
use tokio::time::{self, Duration};
use tokio::net::{TcpStream, TcpListener};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use log::{warn, info};
use simple_logger::SimpleLogger;
use rand::thread_rng;
use rand::seq::SliceRandom;
use tokio::time::{self, Duration};
use std::sync::{Arc, Mutex};
use blastmud_interfaces::*;
use tokio_util::codec;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio_serde::formats::Cbor;
use futures::prelude::*;
#[derive(Deserialize, Debug)]
struct Config {
listeners: Vec<String>,
gameservers: Vec<String>,
gameserver: String,
}
fn read_latest_config() -> Result<Config, Box<dyn std::error::Error>> {
@ -23,59 +27,178 @@ fn read_latest_config() -> Result<Config, Box<dyn std::error::Error>> {
map_err(|error| Box::new(error) as Box<dyn Error>)
}
#[derive(Clone)]
enum ServerInfo {
Connected { stream: Arc<TcpStream>, host: String },
Disconnected { host: String },
#[derive(Debug, Clone)]
enum ServerTaskCommand {
SwitchTo { new_server: String },
Send { message: MessageFromListener }
}
async fn connect_upstream(upstream: &str) -> ServerInfo {
info!("About to connect to {}", upstream);
let stream = TcpStream::connect(&upstream).await;
match stream {
Ok(stream) => {
info!("Connected to {}", upstream);
ServerInfo::Connected { stream: Arc::new(stream), host: upstream.to_string() }
fn run_server_task(
unfinished_business: Option<MessageFromListener>,
mut receiver: mpsc::Receiver<ServerTaskCommand>,
sender: mpsc::Sender<ServerTaskCommand>,
server: String,
message_handler: fn (message: MessageToListener) -> ()
) {
task::spawn(async move {
let conn = loop {
match TcpStream::connect(&server).await {
Err(e) => warn!("Can't connect to {}: {}", server, e),
Ok(c) => break c
}
time::sleep(Duration::from_secs(1)).await;
};
let mut conn_framed = tokio_serde::Framed::new(
codec::Framed::new(conn, LengthDelimitedCodec::new()),
Cbor::<MessageToListener, MessageFromListener>::default()
);
for req in unfinished_business {
match conn_framed.send(req).await {
Ok(_) => {}
Err(e) => {
warn!("Can't re-send acknowledgement to {}: {}. Dropping message", server, e);
// After a re-failure, we don't retry a further time.
run_server_task(None, receiver, sender, server, message_handler);
return;
}
}
}
Err(e) => {
warn!("Couldn't connect to game: {}", e);
ServerInfo::Disconnected { host: upstream.to_string() }
'full_select: loop {
tokio::select!(
req = conn_framed.try_next() => {
match req {
Err(e) => {
warn!("Got read error from {}: {}", server, e);
run_server_task(
None,
receiver,
sender,
server,
message_handler
);
break 'full_select;
}
Ok(None) => {
warn!("Got connection closed from {}", server);
run_server_task(
None,
receiver,
sender,
server,
message_handler
);
break 'full_select;
}
Ok(Some(msg)) => {
message_handler(msg);
}
}
match conn_framed.send(MessageFromListener::AcknowledgeMessage).await {
Ok(_) => {}
Err(e) => {
warn!("Can't send acknowledgement to {}: {}", server, e);
run_server_task(None, receiver, sender, server, message_handler);
break 'full_select;
}
}
},
Some(req) = receiver.recv() => {
match req {
ServerTaskCommand::Send { message } =>
match conn_framed.send(message.clone()).await {
Ok(_) => {
// Now we enter a cut-back loop where we don't
// take on any new work until we see an
// acknowledgement.
'wait_for_ack: loop {
match conn_framed.try_next().await {
Err(e) => {
warn!("Can't read acknowledgement from {}: {}", server, e);
run_server_task(
Some(message),
receiver,
sender,
server,
message_handler
);
break 'full_select;
}
Ok(None) => {
warn!("Got connection closed from {}", server);
run_server_task(
Some(message),
receiver,
sender,
server,
message_handler
);
break 'full_select;
}
Ok(Some(MessageToListener::AcknowledgeMessage)) => {
break 'wait_for_ack;
}
Ok(Some(msg)) => {
message_handler(msg);
}
}
}
}
Err(e) => {
warn!("Can't send message to {}: {}", server, e);
run_server_task(
Some(message),
receiver,
sender,
server,
message_handler
);
break 'full_select;
}
}
ServerTaskCommand::SwitchTo { new_server } => {
// It is safe to just hard cutover at this point, because we haven't
// processed any messages we haven't acknowledged. The new gameserver
// will resend anything queued we didn't acknowledge.
info!("Ending connection to server {} due to reload", server);
run_server_task(
None,
receiver,
sender,
new_server,
message_handler
);
break 'full_select;
}
}
}
);
}
}
});
}
fn start_server_task(server: String) -> mpsc::Sender<ServerTaskCommand> {
let (sender, receiver) = mpsc::channel(20);
run_server_task(None, receiver, sender.clone(), server, |_msg| {});
sender
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
SimpleLogger::new().init().unwrap();
let mut config = read_latest_config()?;
let server_sender = start_server_task(config.gameserver);
let mut sighups = signal(SignalKind::hangup())?;
loop {
let config = read_latest_config()?;
let mut new_servers = Vec::new();
for gameserver in config.gameservers {
new_servers.push(connect_upstream(&gameserver).await);
}
let mut servers = Arc::new(Mutex::new(new_servers));
let send_server = |msg: &str| async move {
loop {
let mut servers_lock = servers.lock().unwrap();
let connected: Vec<&ServerInfo> = (*servers_lock).iter().filter(|serv| match serv {
ServerInfo::Connected { .. } => true,
_ => false
}).collect();
match connected.choose(&mut thread_rng()) {
None => {
time::sleep(Duration::from_secs(1)).await;
}
}
}
};
let mut listen_handles = Vec::new();
for listener in config.listeners {
for listener in config.listeners.clone() {
listen_handles.push(task::spawn(async move {
match TcpListener::bind(&listener).await {
Err(e) => { warn!("Error listening to {}: {}", &listener, e); }
@ -92,36 +215,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}));
}
let reconnect_handle = task::spawn(async move {
let mut new_servers = Vec::new();
let mut old_server_lock = servers.lock().unwrap();
let mut old_servers = (*old_server_lock).clone();
drop(old_server_lock);
sighups.recv().await;
for server in old_servers {
match server {
ServerInfo::Disconnected { host } => {
new_servers.push(connect_upstream(&host).await)
}
x => { new_servers.push(x) }
}
}
*(servers.lock().unwrap()) = new_servers;
});
info!("Reloading configurations");
config = read_latest_config()?;
let mut should_reload = false;
while !should_reload {
tokio::select!(_ = sighups.recv() => {
should_reload = true
})
}
// Note: It is deliberate behaviour to send this even if gameserver
// hasn't changed - SIGHUP is to be used after a server hot cutover to tell
// it to connect to the new server process even if on the same port.
server_sender.send(ServerTaskCommand::SwitchTo { new_server: config.gameserver })
.await?;
for handle in &listen_handles {
handle.abort();
}
reconnect_handle.abort();
info!("Reloading configurations")
}
}

View File

@ -1,4 +1,3 @@
listeners:
- 127.0.0.1:1234
gameservers:
- 127.0.0.1:1235
gameserver: 127.0.0.1:1235