From 6d05573fad25361f194e3d7be67ed7d7d9cb2451 Mon Sep 17 00:00:00 2001 From: Shagnor Date: Fri, 23 Dec 2022 18:30:38 +1100 Subject: [PATCH] Fix bug in listener, plus get some basic functionality going. --- Cargo.lock | 425 +++++++++++++++++++++++++++ blastmud_game/src/av.rs | 2 - blastmud_game/src/db.rs | 2 +- blastmud_game/src/message_handler.rs | 8 + blastmud_game/src/regular_tasks.rs | 30 ++ blastmud_listener/Cargo.toml | 1 + blastmud_listener/src/main.rs | 146 ++++++++- 7 files changed, 606 insertions(+), 8 deletions(-) create mode 100644 blastmud_game/src/regular_tasks.rs diff --git a/Cargo.lock b/Cargo.lock index 58a5b6c..5060e90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,7 @@ dependencies = [ "tokio-stream", "tokio-util", "uuid", + "warp", ] [[package]] @@ -106,6 +107,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "buf_redux" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" +dependencies = [ + "memchr", + "safemem", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -245,6 +256,30 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499" +dependencies = [ + "instant", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.25" @@ -355,6 +390,25 @@ dependencies = [ "wasi", ] +[[package]] +name = "h2" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -367,6 +421,31 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64 0.13.1", + "bitflags", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -385,6 +464,74 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.9.2" @@ -395,6 +542,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "itoa" version = "1.0.4" @@ -465,6 +621,22 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "0.8.5" @@ -477,6 +649,24 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multipart" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182" +dependencies = [ + "buf_redux", + "httparse", + "log", + "mime", + "mime_guess", + "quick-error", + "rand", + "safemem", + "tempfile", + "twoway", +] + [[package]] name = "nix" version = "0.26.1" @@ -670,6 +860,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.21" @@ -718,6 +914,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "retain_mut" version = "0.1.9" @@ -748,12 +953,33 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "ryu" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +[[package]] +name = "safemem" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" + +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -796,6 +1022,29 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.9.14" @@ -809,6 +1058,28 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.6" @@ -912,6 +1183,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tempfile" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +dependencies = [ + "cfg-if", + "fastrand", + "libc", + "redox_syscall", + "remove_dir_all", + "winapi", +] + +[[package]] +name = "thiserror" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.17" @@ -1036,6 +1341,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.4" @@ -1050,6 +1367,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.37" @@ -1057,6 +1380,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] @@ -1070,12 +1394,55 @@ dependencies = [ "once_cell", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "tungstenite" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha-1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "twoway" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" +dependencies = [ + "memchr", +] + [[package]] name = "typenum" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.8" @@ -1109,6 +1476,23 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "url" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "1.2.2" @@ -1125,6 +1509,47 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "warp" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http", + "hyper", + "log", + "mime", + "mime_guess", + "multipart", + "percent-encoding", + "pin-project", + "rustls-pemfile", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/blastmud_game/src/av.rs b/blastmud_game/src/av.rs index 11d1b6e..241e474 100644 --- a/blastmud_game/src/av.rs +++ b/blastmud_game/src/av.rs @@ -3,7 +3,6 @@ use std::error::Error; use serde::Deserialize; use ring::signature; use base64; -use log::info; use crate::DResult; #[derive(Deserialize)] @@ -34,7 +33,6 @@ pub fn check() -> DResult<()> { let sign_text = format!("cn={};{};serial={}", av.cn, av.assertion, av.serial); let key: signature::UnparsedPublicKey<&[u8]> = signature::UnparsedPublicKey::new(&signature::ECDSA_P256_SHA256_ASN1, &KEY_BYTES); - info!("Checking sign_text: {}", sign_text); key.verify(&sign_text.as_bytes(), &base64::decode(av.sig)?) .map_err(|_| Box::::from("Invalid age-verification.yml signature")) } diff --git a/blastmud_game/src/db.rs b/blastmud_game/src/db.rs index 58b7c38..8861c6c 100644 --- a/blastmud_game/src/db.rs +++ b/blastmud_game/src/db.rs @@ -24,7 +24,7 @@ pub async fn record_listener_ping(listener: Uuid, pool: DBPool) -> DResult<()> { pub async fn get_dead_listeners(pool: DBPool) -> DResult> { Ok(get_conn(pool).await? .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ - INTERVAL 2 minutes", &[]) + INTERVAL '2 minutes'", &[]) .await?.into_iter().map(|r| r.get(0)).collect()) } diff --git a/blastmud_game/src/message_handler.rs b/blastmud_game/src/message_handler.rs index 8ad9a27..38f34fc 100644 --- a/blastmud_game/src/message_handler.rs +++ b/blastmud_game/src/message_handler.rs @@ -6,6 +6,7 @@ use uuid::Uuid; use tokio::{sync::oneshot, task}; use crate::listener::ListenerSend; use crate::DResult; +use log::info; pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap) -> DResult<()> { @@ -14,20 +15,27 @@ pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, SessionConnected { session: _, source: _ } => {} SessionDisconnected { session: _ } => {} SessionSentLine { session, msg } => { + info!("Awaiting listener lock"); let lmlock = listener_map.lock().await; let opt_sender = lmlock.get(&listener).map(|v| v.clone()); drop(lmlock); + info!("Listener lock dropped"); match opt_sender { None => {} Some(sender) => { + info!("Spawning message task"); task::spawn(async move { let (tx, rx) = oneshot::channel(); + info!("Sending echo"); sender.send(ListenerSend { message: MessageToListener::SendToSession { session, msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) }, ack_notify: tx }).await.unwrap_or(()); + info!("Awaiting echo ack"); rx.await.unwrap_or(()); + info!("Echo ack received"); }); + info!("Message task spawned"); } } } diff --git a/blastmud_game/src/regular_tasks.rs b/blastmud_game/src/regular_tasks.rs new file mode 100644 index 0000000..8803204 --- /dev/null +++ b/blastmud_game/src/regular_tasks.rs @@ -0,0 +1,30 @@ +use tokio::{task, time}; +use crate::DResult; +use crate::db; +use log::warn; + +async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> { + for listener in db::get_dead_listeners(pool.clone()).await? { + db::cleanup_listener(pool.clone(), listener).await?; + } + Ok(()) +} + +fn start_session_cleanup_task(pool: db::DBPool) -> DResult<()> { + task::spawn(async move { + loop { + match cleanup_session_once(pool.clone()).await { + Ok(()) => {} + Err(e) => { + warn!("Error cleaning up sessions: {}", e); + time::sleep(time::Duration::from_secs(1)).await; + } + } + } + }); + Ok(()) +} + +pub fn start_regular_tasks(pool: db::DBPool) -> DResult<()> { + start_session_cleanup_task(pool) +} diff --git a/blastmud_listener/Cargo.toml b/blastmud_listener/Cargo.toml index 32d50f3..682be26 100644 --- a/blastmud_listener/Cargo.toml +++ b/blastmud_listener/Cargo.toml @@ -18,3 +18,4 @@ tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] } tokio-stream = "0.1.11" tokio-util = { version = "0.7.4", features = ["codec"] } uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] } +warp = "0.3.3" diff --git a/blastmud_listener/src/main.rs b/blastmud_listener/src/main.rs index 7c38216..524d8fd 100644 --- a/blastmud_listener/src/main.rs +++ b/blastmud_listener/src/main.rs @@ -6,7 +6,7 @@ use std::fs; use serde::*; use tokio::task; use tokio::time::{self, Duration}; -use tokio::net::{TcpStream, TcpListener}; +use tokio::net::{TcpStream, TcpListener, lookup_host}; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::{mpsc, Mutex}; use tokio::io::{BufReader, AsyncWriteExt}; @@ -20,16 +20,22 @@ use tokio_serde::formats::Cbor; use futures::prelude::*; use uuid::Uuid; use tokio_stream::wrappers::ReceiverStream; +use warp; +use warp::filters::ws; +use warp::Filter; #[derive(Deserialize, Debug)] struct Config { listeners: Vec, + ws_listener: String, gameserver: String, } -fn read_latest_config() -> Result> { +type DResult = Result>; + +fn read_latest_config() -> DResult { serde_yaml::from_str(&fs::read_to_string("listener.conf")?). - map_err(|error| Box::new(error) as Box) + map_err(|error| Box::new(error) as Box) } #[derive(Debug, Clone)] @@ -159,7 +165,18 @@ where Ok(Some(msg)) => { let mhfut = message_handler(msg); mhfut.await; + + match conn_framed.send(MessageFromListener::AcknowledgeMessage).await { + Ok(_) => {} + Err(e) => { + warn!("Can't send acknowledgement to {}: {}", server, e); + run_server_task(None, listener_id, receiver, sender, server, + message_handler); + break 'full_select; + } + } } + } } } @@ -316,7 +333,7 @@ async fn handle_client_socket( } Ok(Some(msg)) => { server.send(ServerTaskCommand::Send { - message: MessageFromListener::SessionSentLine { session, msg } + message: MessageFromListener::SessionSentLine {session, msg } }).await.unwrap(); } } @@ -341,8 +358,125 @@ fn start_pinger(listener: Uuid, server: mpsc::Sender) { }); } +async fn handle_websocket( + mut ws: ws::WebSocket, + src: String, + active_sessions: SessionMap, + server: mpsc::Sender +) { + let session = Uuid::new_v4(); + info!("Accepted websocket session {} with forwarded-for {}", session, src); + + let (lsender, mut lreceiver) = mpsc::channel(MAX_CAPACITY); + let (discon_sender, mut discon_receiver) = mpsc::unbounded_channel(); + + active_sessions.lock().await.insert( + session, SessionRecord { + channel: lsender.clone(), + disconnect_channel: discon_sender.clone() + }); + server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected { + session, source: src + }}).await.unwrap(); + + 'client_loop: loop { + tokio::select!( + Some(()) = discon_receiver.recv() => { + info!("Client connection {} instructed for immediate disconnect", session); + break 'client_loop; + } + Some(message) = lreceiver.recv() => { + match message { + SessionCommand::Disconnect => { + info!("Client connection {} instructed for disconnect", session); + break 'client_loop; + } + SessionCommand::SendString { message } => + match ws.send(ws::Message::text(message)).await { + Err(e) => { + info!("Client connection {} got error {}", session, e); + } + Ok(()) => {} + } + } + }, + msg_read = ws.try_next(), if lsender.capacity() > STOP_READING_CAPACITY => { + match msg_read { + Err(e) => { + info!("Client connection {} got error {}", session, e); + break 'client_loop; + } + Ok(None) => { + info!("Client connection {} closed", session); + break 'client_loop; + } + Ok(Some(msg)) if msg.is_close() => { + info!("Client connection {} got WS close message", session); + break 'client_loop; + } + Ok(Some(wsmsg)) if wsmsg.is_text() => { + match wsmsg.to_str() { + Err(_) => {} + Ok(msg) => { + server.send(ServerTaskCommand::Send { + message: MessageFromListener::SessionSentLine { + session, + msg: msg.to_owned() + } + }).await.unwrap_or(()); + } + } + } + _ => {} + } + }, + _ = time::sleep(time::Duration::from_secs(60)) => { + match ws.send(ws::Message::ping([])).await { + Err(e) => { + info!("Client connection {} got error {}", session, e); + } + Ok(()) => {} + } + } + ); + } + + server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected { + session + }}).await.unwrap(); + active_sessions.lock().await.remove(&session); +} + +async fn upgrade_websocket(src: String, wsreq: ws::Ws, + active_sessions: SessionMap, + server_sender: mpsc::Sender) -> + Result { + Ok( + wsreq.on_upgrade(|wss| handle_websocket( + wss, src, active_sessions, + server_sender)) + ) +} + +async fn start_websocket(bind: String, active_sessions: SessionMap, server_sender: mpsc::Sender) -> DResult<()> { + let sockaddr = lookup_host(bind).await?.next().expect("Can't resolve websocket bind name"); + let routes = + warp::get() + .and(warp::path("wsgame")) + .and(warp::header("X-Forwarded-For")) + .and(ws::ws()) + .and_then(move |src, wsreq| upgrade_websocket(src, wsreq, active_sessions.clone(), server_sender.clone())); + + task::spawn( + warp::serve( + routes + ).run(sockaddr) + ); + Ok(()) +} + #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); let listener_id = Uuid::new_v4(); @@ -352,6 +486,8 @@ async fn main() -> Result<(), Box> { let server_sender = start_server_task(listener_id, config.gameserver, active_sessions.clone()); start_pinger(listener_id, server_sender.clone()); + // Note: for now, this cannot be reconfigured without a complete restart. + start_websocket(config.ws_listener, active_sessions.clone(), server_sender.clone()).await?; let mut sighups = signal(SignalKind::hangup())?;