Make listener send ping on connect + regularly

This commit is contained in:
Condorra 2022-12-14 21:34:01 +11:00
parent a152e656be
commit 08b24b2fed
6 changed files with 476 additions and 23 deletions

381
Cargo.lock generated
View File

@ -2,6 +2,17 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "async-trait"
version = "0.1.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -19,11 +30,33 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "blastmud_game" name = "blastmud_game"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"blastmud_interfaces", "blastmud_interfaces",
"deadpool-postgres",
"futures",
"log",
"serde",
"serde_yaml",
"simple_logger",
"tokio",
"tokio-postgres",
"tokio-serde",
"tokio-util",
] ]
[[package]] [[package]]
@ -47,10 +80,26 @@ dependencies = [
"simple_logger", "simple_logger",
"tokio", "tokio",
"tokio-serde", "tokio-serde",
"tokio-stream",
"tokio-util", "tokio-util",
"uuid", "uuid",
] ]
[[package]]
name = "block-buffer"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
dependencies = [
"generic-array",
]
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.3.0" version = "1.3.0"
@ -74,6 +123,72 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "cpufeatures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"serde",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e866e414e9e12fc988f0bfb89a0b86228e7ed196ca509fbc4dcbc738c56e753c"
dependencies = [
"deadpool",
"log",
"serde",
"tokio",
"tokio-postgres",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
"tokio",
]
[[package]]
name = "digest"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
"block-buffer",
"crypto-common",
"subtle",
]
[[package]] [[package]]
name = "educe" name = "educe"
version = "0.4.20" version = "0.4.20"
@ -100,6 +215,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.25" version = "0.3.25"
@ -189,6 +310,16 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "generic-array"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9"
dependencies = [
"typenum",
"version_check",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.8" version = "0.2.8"
@ -221,6 +352,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.9.2" version = "1.9.2"
@ -249,6 +389,16 @@ version = "0.2.138"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.17"
@ -258,6 +408,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
@ -331,6 +490,53 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "phf"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
dependencies = [
"siphasher",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.12" version = "1.0.12"
@ -363,6 +569,36 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
dependencies = [
"base64",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"uuid",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.17" version = "0.2.17"
@ -417,6 +653,21 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.4.0" version = "0.4.0"
@ -432,6 +683,12 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.14" version = "1.0.14"
@ -440,9 +697,9 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.149" version = "1.0.150"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055" checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -459,9 +716,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.149" version = "1.0.150"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4" checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -481,6 +738,17 @@ dependencies = [
"unsafe-libyaml", "unsafe-libyaml",
] ]
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.0" version = "1.4.0"
@ -503,6 +771,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.7" version = "0.4.7"
@ -512,6 +786,12 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.7" version = "0.4.7"
@ -522,6 +802,22 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.105" version = "1.0.105"
@ -562,6 +858,21 @@ dependencies = [
"time-core", "time-core",
] ]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.23.0" version = "1.23.0"
@ -592,6 +903,30 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2",
"tokio",
"tokio-util",
]
[[package]] [[package]]
name = "tokio-serde" name = "tokio-serde"
version = "0.8.0" version = "0.8.0"
@ -607,6 +942,17 @@ dependencies = [
"serde_cbor", "serde_cbor",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.4" version = "0.7.4"
@ -641,12 +987,33 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "typenum"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.5" version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.4" version = "0.2.4"
@ -663,6 +1030,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"

View File

@ -16,3 +16,16 @@ Blastmud consists of the following main components:
# Status # Status
Blastmud is not yet playable, it is under development. Blastmud is not yet playable, it is under development.
# Schema management
We only keep the latest version in version control, and use migra (pip3 install migra) to identify changes between
the production schema and the latest in code.
The latest schema is under `schema`.
Create a user with a secret password, and username `blast`. Create a production database called `blast`.
To get to the latest schema:
* Run `psql <schema/schema.sql` to create the temporary `blast_schemaonly` database.
* Run `migra "postgres:///blast" "postgres:///blast_schemaonly" > /tmp/update.sql`
* Check `/tmp/update.sql` and if it looks good, apply it with `psql -d blast </tmp/update.sql`

View File

@ -7,3 +7,13 @@ edition = "2021"
[dependencies] [dependencies]
blastmud_interfaces = { path = "../blastmud_interfaces" } blastmud_interfaces = { path = "../blastmud_interfaces" }
deadpool-postgres = { version = "0.10.3", features = ["serde"] }
futures = "0.3.25"
log = "0.4.17"
serde = { version = "1.0.150", features = ["derive", "serde_derive"] }
serde_yaml = "0.9.14"
simple_logger = "4.0.0"
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
tokio-postgres = { version = "0.7.7", features = ["with-uuid-1"] }
tokio-serde = { version = "0.8.0", features = ["serde", "serde_cbor", "cbor"] }
tokio-util = { version = "0.7.4", features = ["codec"] }

View File

@ -1,3 +1,37 @@
fn main() { use serde::Deserialize;
println!("Hello, world!"); use std::fs;
use std::error::Error;
use log::{info, LevelFilter};
use simple_logger::SimpleLogger;
mod db;
mod listener;
#[derive(Deserialize, Debug)]
struct Config {
listener: String,
pidfile: String,
database_conn_string: String
}
fn read_latest_config() -> Result<Config, Box<dyn Error>> {
serde_yaml::from_str(&fs::read_to_string("gameserver.conf")?).
map_err(|error| Box::new(error) as Box<dyn Error>)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap();
let config = read_latest_config()?;
let pool = db::start_pool(&config.database_conn_string)?;
// Test the database connection string works so we quit early if not...
let _ = pool.get().await?.query("SELECT 1", &[]).await?;
info!("Database pool initialised: {:?}", pool.status());
let listener = listener::start_listener(config.listener);
Ok(())
} }

View File

@ -15,5 +15,6 @@ serde_yaml = "0.9.14"
simple_logger = "4.0.0" simple_logger = "4.0.0"
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] } tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] } tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] }
tokio-stream = "0.1.11"
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] } uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] }

View File

@ -19,6 +19,7 @@ use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio_serde::formats::Cbor; use tokio_serde::formats::Cbor;
use futures::prelude::*; use futures::prelude::*;
use uuid::Uuid; use uuid::Uuid;
use tokio_stream::wrappers::ReceiverStream;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct Config { struct Config {
@ -39,7 +40,8 @@ enum ServerTaskCommand {
fn run_server_task<FHandler, HandlerFut>( fn run_server_task<FHandler, HandlerFut>(
unfinished_business: Option<MessageFromListener>, unfinished_business: Option<MessageFromListener>,
mut receiver: mpsc::Receiver<ServerTaskCommand>, listener_id: Uuid,
mut receiver: ReceiverStream<ServerTaskCommand>,
sender: mpsc::Sender<ServerTaskCommand>, sender: mpsc::Sender<ServerTaskCommand>,
server: String, server: String,
message_handler: FHandler message_handler: FHandler
@ -61,17 +63,13 @@ where
Cbor::<MessageToListener, MessageFromListener>::default() Cbor::<MessageToListener, MessageFromListener>::default()
); );
for req in unfinished_business { let mut commands = stream::iter(vec!(
match conn_framed.send(req).await { ServerTaskCommand::Send {
Ok(_) => {} message: MessageFromListener::ListenerPing { uuid: listener_id }
Err(e) => { })
warn!("Can't re-send acknowledgement to {}: {}. Dropping message", server, e); ).chain(
// After a re-failure, we don't retry a further time. stream::iter(unfinished_business.map(|message| ServerTaskCommand::Send { message }))
run_server_task(None, receiver, sender, server, message_handler); ).chain(&mut receiver);
return;
}
}
}
'full_select: loop { 'full_select: loop {
tokio::select!( tokio::select!(
@ -81,6 +79,7 @@ where
warn!("Got read error from {}: {}", server, e); warn!("Got read error from {}: {}", server, e);
run_server_task( run_server_task(
None, None,
listener_id,
receiver, receiver,
sender, sender,
server, server,
@ -92,6 +91,7 @@ where
warn!("Got connection closed from {}", server); warn!("Got connection closed from {}", server);
run_server_task( run_server_task(
None, None,
listener_id,
receiver, receiver,
sender, sender,
server, server,
@ -108,12 +108,13 @@ where
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
warn!("Can't send acknowledgement to {}: {}", server, e); warn!("Can't send acknowledgement to {}: {}", server, e);
run_server_task(None, receiver, sender, server, message_handler); run_server_task(None, listener_id, receiver, sender, server,
message_handler);
break 'full_select; break 'full_select;
} }
} }
}, },
Some(req) = receiver.recv() => { Some(req) = commands.next() => {
match req { match req {
ServerTaskCommand::Send { message } => ServerTaskCommand::Send { message } =>
match conn_framed.send(message.clone()).await { match conn_framed.send(message.clone()).await {
@ -127,6 +128,7 @@ where
warn!("Can't read acknowledgement from {}: {}", server, e); warn!("Can't read acknowledgement from {}: {}", server, e);
run_server_task( run_server_task(
Some(message), Some(message),
listener_id,
receiver, receiver,
sender, sender,
server, server,
@ -138,6 +140,7 @@ where
warn!("Got connection closed from {}", server); warn!("Got connection closed from {}", server);
run_server_task( run_server_task(
Some(message), Some(message),
listener_id,
receiver, receiver,
sender, sender,
server, server,
@ -158,6 +161,7 @@ where
warn!("Can't send message to {}: {}", server, e); warn!("Can't send message to {}: {}", server, e);
run_server_task( run_server_task(
Some(message), Some(message),
listener_id,
receiver, receiver,
sender, sender,
server, server,
@ -173,6 +177,7 @@ where
info!("Ending connection to server {} due to reload", server); info!("Ending connection to server {} due to reload", server);
run_server_task( run_server_task(
None, None,
listener_id,
receiver, receiver,
sender, sender,
new_server, new_server,
@ -233,9 +238,12 @@ async fn handle_server_message(session_map: SessionMap, message: MessageToListen
} }
} }
fn start_server_task(server: String, session_map: SessionMap) -> mpsc::Sender<ServerTaskCommand> { fn start_server_task(listener_id: Uuid,
server: String,
session_map: SessionMap) -> mpsc::Sender<ServerTaskCommand> {
let (sender, receiver) = mpsc::channel(20); let (sender, receiver) = mpsc::channel(20);
run_server_task(None, receiver, sender.clone(), server, let receiver_stream = ReceiverStream::new(receiver);
run_server_task(None, listener_id, receiver_stream, sender.clone(), server,
move |msg| handle_server_message(session_map.clone(), move |msg| handle_server_message(session_map.clone(),
msg) ); msg) );
sender sender
@ -318,14 +326,28 @@ async fn handle_client_socket(
active_sessions.lock().await.remove(&session); active_sessions.lock().await.remove(&session);
} }
fn start_pinger(listener: Uuid, server: mpsc::Sender<ServerTaskCommand>) {
task::spawn(async move {
loop {
time::sleep(Duration::from_secs(60)).await;
server.send(ServerTaskCommand::Send {
message: MessageFromListener::ListenerPing { uuid: listener }
}).await.unwrap();
}
});
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
SimpleLogger::new().init().unwrap(); SimpleLogger::new().init().unwrap();
let listener_id = Uuid::new_v4();
let mut config = read_latest_config()?; let mut config = read_latest_config()?;
let active_sessions: SessionMap = let active_sessions: SessionMap =
Arc::new(Mutex::new(BTreeMap::new())); Arc::new(Mutex::new(BTreeMap::new()));
let server_sender = start_server_task(config.gameserver, active_sessions.clone()); let server_sender = start_server_task(listener_id, config.gameserver, active_sessions.clone());
start_pinger(listener_id, server_sender.clone());
let mut sighups = signal(SignalKind::hangup())?; let mut sighups = signal(SignalKind::hangup())?;