Fix bug in listener, plus get some basic functionality going.

This commit is contained in:
Condorra 2022-12-23 18:30:38 +11:00
parent b090d701aa
commit 6d05573fad
7 changed files with 606 additions and 8 deletions

425
Cargo.lock generated
View File

@ -95,6 +95,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"uuid", "uuid",
"warp",
] ]
[[package]] [[package]]
@ -106,6 +107,16 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.11.1" version = "3.11.1"
@ -245,6 +256,30 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499"
dependencies = [
"instant",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
dependencies = [
"percent-encoding",
]
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.25" version = "0.3.25"
@ -355,6 +390,25 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "h2"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "half" name = "half"
version = "1.8.2" version = "1.8.2"
@ -367,6 +421,31 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "headers"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584"
dependencies = [
"base64 0.13.1",
"bitflags",
"bytes",
"headers-core",
"http",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -385,6 +464,74 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "http"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "idna"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.9.2" version = "1.9.2"
@ -395,6 +542,15 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.4" version = "1.0.4"
@ -465,6 +621,22 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.5" version = "0.8.5"
@ -477,6 +649,24 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "multipart"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182"
dependencies = [
"buf_redux",
"httparse",
"log",
"mime",
"mime_guess",
"quick-error",
"rand",
"safemem",
"tempfile",
"twoway",
]
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.26.1" version = "0.26.1"
@ -670,6 +860,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.21" version = "1.0.21"
@ -718,6 +914,15 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "retain_mut" name = "retain_mut"
version = "0.1.9" version = "0.1.9"
@ -748,12 +953,33 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64 0.13.1",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.11" version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "safemem"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -796,6 +1022,29 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "serde_json"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]] [[package]]
name = "serde_yaml" name = "serde_yaml"
version = "0.9.14" version = "0.9.14"
@ -809,6 +1058,28 @@ dependencies = [
"unsafe-libyaml", "unsafe-libyaml",
] ]
[[package]]
name = "sha-1"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha1"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.6" version = "0.10.6"
@ -912,6 +1183,40 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tempfile"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [
"cfg-if",
"fastrand",
"libc",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]]
name = "thiserror"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.17" version = "0.3.17"
@ -1036,6 +1341,18 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.4" version = "0.7.4"
@ -1050,6 +1367,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.37" version = "0.1.37"
@ -1057,6 +1380,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"log",
"pin-project-lite", "pin-project-lite",
"tracing-core", "tracing-core",
] ]
@ -1070,12 +1394,55 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.16.0" version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.8" version = "0.3.8"
@ -1109,6 +1476,23 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.2.2" version = "1.2.2"
@ -1125,6 +1509,47 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http",
"hyper",
"log",
"mime",
"mime_guess",
"multipart",
"percent-encoding",
"pin-project",
"rustls-pemfile",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"

View File

@ -3,7 +3,6 @@ use std::error::Error;
use serde::Deserialize; use serde::Deserialize;
use ring::signature; use ring::signature;
use base64; use base64;
use log::info;
use crate::DResult; use crate::DResult;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -34,7 +33,6 @@ pub fn check() -> DResult<()> {
let sign_text = format!("cn={};{};serial={}", av.cn, av.assertion, av.serial); let sign_text = format!("cn={};{};serial={}", av.cn, av.assertion, av.serial);
let key: signature::UnparsedPublicKey<&[u8]> = let key: signature::UnparsedPublicKey<&[u8]> =
signature::UnparsedPublicKey::new(&signature::ECDSA_P256_SHA256_ASN1, &KEY_BYTES); signature::UnparsedPublicKey::new(&signature::ECDSA_P256_SHA256_ASN1, &KEY_BYTES);
info!("Checking sign_text: {}", sign_text);
key.verify(&sign_text.as_bytes(), &base64::decode(av.sig)?) key.verify(&sign_text.as_bytes(), &base64::decode(av.sig)?)
.map_err(|_| Box::<dyn Error + Send + Sync>::from("Invalid age-verification.yml signature")) .map_err(|_| Box::<dyn Error + Send + Sync>::from("Invalid age-verification.yml signature"))
} }

View File

@ -24,7 +24,7 @@ pub async fn record_listener_ping(listener: Uuid, pool: DBPool) -> DResult<()> {
pub async fn get_dead_listeners(pool: DBPool) -> DResult<Vec<Uuid>> { pub async fn get_dead_listeners(pool: DBPool) -> DResult<Vec<Uuid>> {
Ok(get_conn(pool).await? Ok(get_conn(pool).await?
.query("SELECT listener FROM listeners WHERE last_seen < NOW() - \ .query("SELECT listener FROM listeners WHERE last_seen < NOW() - \
INTERVAL 2 minutes", &[]) INTERVAL '2 minutes'", &[])
.await?.into_iter().map(|r| r.get(0)).collect()) .await?.into_iter().map(|r| r.get(0)).collect())
} }

View File

@ -6,6 +6,7 @@ use uuid::Uuid;
use tokio::{sync::oneshot, task}; use tokio::{sync::oneshot, task};
use crate::listener::ListenerSend; use crate::listener::ListenerSend;
use crate::DResult; use crate::DResult;
use log::info;
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap) pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool, listener_map: ListenerMap)
-> DResult<()> { -> DResult<()> {
@ -14,20 +15,27 @@ pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool,
SessionConnected { session: _, source: _ } => {} SessionConnected { session: _, source: _ } => {}
SessionDisconnected { session: _ } => {} SessionDisconnected { session: _ } => {}
SessionSentLine { session, msg } => { SessionSentLine { session, msg } => {
info!("Awaiting listener lock");
let lmlock = listener_map.lock().await; let lmlock = listener_map.lock().await;
let opt_sender = lmlock.get(&listener).map(|v| v.clone()); let opt_sender = lmlock.get(&listener).map(|v| v.clone());
drop(lmlock); drop(lmlock);
info!("Listener lock dropped");
match opt_sender { match opt_sender {
None => {} None => {}
Some(sender) => { Some(sender) => {
info!("Spawning message task");
task::spawn(async move { task::spawn(async move {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
info!("Sending echo");
sender.send(ListenerSend { message: MessageToListener::SendToSession { sender.send(ListenerSend { message: MessageToListener::SendToSession {
session, session,
msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) }, msg: format!("You hear an echo saying: \x1b[31m{}\x1b[0m\r\n", msg) },
ack_notify: tx }).await.unwrap_or(()); ack_notify: tx }).await.unwrap_or(());
info!("Awaiting echo ack");
rx.await.unwrap_or(()); rx.await.unwrap_or(());
info!("Echo ack received");
}); });
info!("Message task spawned");
} }
} }
} }

View File

@ -0,0 +1,30 @@
use tokio::{task, time};
use crate::DResult;
use crate::db;
use log::warn;
async fn cleanup_session_once(pool: db::DBPool) -> DResult<()> {
for listener in db::get_dead_listeners(pool.clone()).await? {
db::cleanup_listener(pool.clone(), listener).await?;
}
Ok(())
}
fn start_session_cleanup_task(pool: db::DBPool) -> DResult<()> {
task::spawn(async move {
loop {
match cleanup_session_once(pool.clone()).await {
Ok(()) => {}
Err(e) => {
warn!("Error cleaning up sessions: {}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
}
}
});
Ok(())
}
pub fn start_regular_tasks(pool: db::DBPool) -> DResult<()> {
start_session_cleanup_task(pool)
}

View File

@ -18,3 +18,4 @@ tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] }
tokio-stream = "0.1.11" tokio-stream = "0.1.11"
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] } uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] }
warp = "0.3.3"

View File

@ -6,7 +6,7 @@ use std::fs;
use serde::*; use serde::*;
use tokio::task; use tokio::task;
use tokio::time::{self, Duration}; use tokio::time::{self, Duration};
use tokio::net::{TcpStream, TcpListener}; use tokio::net::{TcpStream, TcpListener, lookup_host};
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
use tokio::io::{BufReader, AsyncWriteExt}; use tokio::io::{BufReader, AsyncWriteExt};
@ -20,16 +20,22 @@ use tokio_serde::formats::Cbor;
use futures::prelude::*; use futures::prelude::*;
use uuid::Uuid; use uuid::Uuid;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use warp;
use warp::filters::ws;
use warp::Filter;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct Config { struct Config {
listeners: Vec<String>, listeners: Vec<String>,
ws_listener: String,
gameserver: String, gameserver: String,
} }
fn read_latest_config() -> Result<Config, Box<dyn std::error::Error>> { type DResult<A> = Result<A, Box<dyn Error + Send + Sync>>;
fn read_latest_config() -> DResult<Config> {
serde_yaml::from_str(&fs::read_to_string("listener.conf")?). serde_yaml::from_str(&fs::read_to_string("listener.conf")?).
map_err(|error| Box::new(error) as Box<dyn Error>) map_err(|error| Box::new(error) as Box<dyn Error + Send + Sync>)
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -159,9 +165,20 @@ where
Ok(Some(msg)) => { Ok(Some(msg)) => {
let mhfut = message_handler(msg); let mhfut = message_handler(msg);
mhfut.await; mhfut.await;
match conn_framed.send(MessageFromListener::AcknowledgeMessage).await {
Ok(_) => {}
Err(e) => {
warn!("Can't send acknowledgement to {}: {}", server, e);
run_server_task(None, listener_id, receiver, sender, server,
message_handler);
break 'full_select;
} }
} }
} }
}
}
} }
Err(e) => { Err(e) => {
warn!("Can't send message to {}: {}", server, e); warn!("Can't send message to {}: {}", server, e);
@ -316,7 +333,7 @@ async fn handle_client_socket(
} }
Ok(Some(msg)) => { Ok(Some(msg)) => {
server.send(ServerTaskCommand::Send { server.send(ServerTaskCommand::Send {
message: MessageFromListener::SessionSentLine { session, msg } message: MessageFromListener::SessionSentLine {session, msg }
}).await.unwrap(); }).await.unwrap();
} }
} }
@ -341,8 +358,125 @@ fn start_pinger(listener: Uuid, server: mpsc::Sender<ServerTaskCommand>) {
}); });
} }
async fn handle_websocket(
mut ws: ws::WebSocket,
src: String,
active_sessions: SessionMap,
server: mpsc::Sender<ServerTaskCommand>
) {
let session = Uuid::new_v4();
info!("Accepted websocket session {} with forwarded-for {}", session, src);
let (lsender, mut lreceiver) = mpsc::channel(MAX_CAPACITY);
let (discon_sender, mut discon_receiver) = mpsc::unbounded_channel();
active_sessions.lock().await.insert(
session, SessionRecord {
channel: lsender.clone(),
disconnect_channel: discon_sender.clone()
});
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected {
session, source: src
}}).await.unwrap();
'client_loop: loop {
tokio::select!(
Some(()) = discon_receiver.recv() => {
info!("Client connection {} instructed for immediate disconnect", session);
break 'client_loop;
}
Some(message) = lreceiver.recv() => {
match message {
SessionCommand::Disconnect => {
info!("Client connection {} instructed for disconnect", session);
break 'client_loop;
}
SessionCommand::SendString { message } =>
match ws.send(ws::Message::text(message)).await {
Err(e) => {
info!("Client connection {} got error {}", session, e);
}
Ok(()) => {}
}
}
},
msg_read = ws.try_next(), if lsender.capacity() > STOP_READING_CAPACITY => {
match msg_read {
Err(e) => {
info!("Client connection {} got error {}", session, e);
break 'client_loop;
}
Ok(None) => {
info!("Client connection {} closed", session);
break 'client_loop;
}
Ok(Some(msg)) if msg.is_close() => {
info!("Client connection {} got WS close message", session);
break 'client_loop;
}
Ok(Some(wsmsg)) if wsmsg.is_text() => {
match wsmsg.to_str() {
Err(_) => {}
Ok(msg) => {
server.send(ServerTaskCommand::Send {
message: MessageFromListener::SessionSentLine {
session,
msg: msg.to_owned()
}
}).await.unwrap_or(());
}
}
}
_ => {}
}
},
_ = time::sleep(time::Duration::from_secs(60)) => {
match ws.send(ws::Message::ping([])).await {
Err(e) => {
info!("Client connection {} got error {}", session, e);
}
Ok(()) => {}
}
}
);
}
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected {
session
}}).await.unwrap();
active_sessions.lock().await.remove(&session);
}
async fn upgrade_websocket(src: String, wsreq: ws::Ws,
active_sessions: SessionMap,
server_sender: mpsc::Sender<ServerTaskCommand>) ->
Result<impl warp::Reply, warp::Rejection> {
Ok(
wsreq.on_upgrade(|wss| handle_websocket(
wss, src, active_sessions,
server_sender))
)
}
async fn start_websocket(bind: String, active_sessions: SessionMap, server_sender: mpsc::Sender<ServerTaskCommand>) -> DResult<()> {
let sockaddr = lookup_host(bind).await?.next().expect("Can't resolve websocket bind name");
let routes =
warp::get()
.and(warp::path("wsgame"))
.and(warp::header("X-Forwarded-For"))
.and(ws::ws())
.and_then(move |src, wsreq| upgrade_websocket(src, wsreq, active_sessions.clone(), server_sender.clone()));
task::spawn(
warp::serve(
routes
).run(sockaddr)
);
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap();
let listener_id = Uuid::new_v4(); let listener_id = Uuid::new_v4();
@ -352,6 +486,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server_sender = start_server_task(listener_id, config.gameserver, active_sessions.clone()); let server_sender = start_server_task(listener_id, config.gameserver, active_sessions.clone());
start_pinger(listener_id, server_sender.clone()); start_pinger(listener_id, server_sender.clone());
// Note: for now, this cannot be reconfigured without a complete restart.
start_websocket(config.ws_listener, active_sessions.clone(), server_sender.clone()).await?;
let mut sighups = signal(SignalKind::hangup())?; let mut sighups = signal(SignalKind::hangup())?;