From b4bc83ba02d99102e842ccd77c2ee34581d52bd7 Mon Sep 17 00:00:00 2001 From: Shagnor Date: Mon, 12 Dec 2022 23:36:07 +1100 Subject: [PATCH] More progress on listener --- Cargo.lock | 4 ++ blastmud_interfaces/src/lib.rs | 10 ++-- blastmud_listener/Cargo.toml | 3 +- blastmud_listener/src/main.rs | 99 ++++++++++++++++++++++++++++++---- 4 files changed, 101 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31fa49d5..f2201819 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,7 @@ dependencies = [ "tokio", "tokio-serde", "tokio-util", + "uuid", ] [[package]] @@ -568,7 +569,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", + "bytes", "libc", + "memchr", "mio", "num_cpus", "pin-project-lite", @@ -656,6 +659,7 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ + "getrandom", "serde", ] diff --git a/blastmud_interfaces/src/lib.rs b/blastmud_interfaces/src/lib.rs index 23bd2d44..631c06de 100644 --- a/blastmud_interfaces/src/lib.rs +++ b/blastmud_interfaces/src/lib.rs @@ -4,15 +4,15 @@ use serde::*; #[derive(Serialize, Deserialize, Clone, Debug)] pub enum MessageFromListener { ListenerPing { uuid: Uuid }, - UserConnected { user: Uuid, source: String }, - UserDisconnected { user: Uuid }, - UserSentLine { user: Uuid, msg: String }, + SessionConnected { session: Uuid, source: String }, + SessionDisconnected { session: Uuid }, + SessionSentLine { session: Uuid, msg: String }, AcknowledgeMessage } #[derive(Serialize, Deserialize, Clone, Debug)] pub enum MessageToListener { - DisconnectUser { user: Uuid }, - SendToUser { user: Uuid, msg: String }, + DisconnectSession { session: Uuid }, + SendToSession { session: Uuid, msg: String }, AcknowledgeMessage } diff --git a/blastmud_listener/Cargo.toml b/blastmud_listener/Cargo.toml index fd6ee112..d622a851 100644 --- a/blastmud_listener/Cargo.toml +++ b/blastmud_listener/Cargo.toml @@ -13,6 +13,7 @@ rand = "0.8.5" serde = { version = "1.0.149", features = ["derive", "serde_derive"] } serde_yaml = "0.9.14" simple_logger = "4.0.0" -tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync"] } +tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] } tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] } tokio-util = { version = "0.7.4", features = ["codec"] } +uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] } diff --git a/blastmud_listener/src/main.rs b/blastmud_listener/src/main.rs index b777b646..4d50f17d 100644 --- a/blastmud_listener/src/main.rs +++ b/blastmud_listener/src/main.rs @@ -1,20 +1,24 @@ use std::vec::Vec; +use std::collections::BTreeMap; use std::error::Error; +use std::net::SocketAddr; use std::fs; use serde::*; use tokio::task; use tokio::time::{self, Duration}; use tokio::net::{TcpStream, TcpListener}; use tokio::signal::unix::{signal, SignalKind}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; +use tokio::io::{BufReader}; use log::{warn, info}; use simple_logger::SimpleLogger; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use blastmud_interfaces::*; use tokio_util::codec; use tokio_util::codec::length_delimited::LengthDelimitedCodec; use tokio_serde::formats::Cbor; use futures::prelude::*; +use uuid::Uuid; #[derive(Deserialize, Debug)] struct Config { @@ -33,12 +37,12 @@ enum ServerTaskCommand { Send { message: MessageFromListener } } -fn run_server_task( +fn run_server_task () + Send + 'static>( unfinished_business: Option, mut receiver: mpsc::Receiver, sender: mpsc::Sender, server: String, - message_handler: fn (message: MessageToListener) -> () + message_handler: FHandler ) { task::spawn(async move { let conn = loop { @@ -181,24 +185,92 @@ fn run_server_task( } -fn start_server_task(server: String) -> mpsc::Sender { +enum SessionCommand { + Disconnect +} + +struct SessionRecord { + channel: mpsc::Sender +} + +type SessionMap = Arc>>; + +fn handle_server_message(session_map: SessionMap, message: MessageToListener) { +} + +fn start_server_task(server: String, session_map: SessionMap) -> mpsc::Sender { let (sender, receiver) = mpsc::channel(20); - run_server_task(None, receiver, sender.clone(), server, |_msg| {}); + run_server_task(None, receiver, sender.clone(), server, + move |msg| { handle_server_message(session_map.clone(), msg); }); sender } +async fn handle_client_socket( + server: mpsc::Sender, + active_sessions: SessionMap, + mut stream: TcpStream, + addr: SocketAddr +) { + let (rstream, mut wstream) = stream.split(); + let mut rbuf = codec::FramedRead::new( + BufReader::new(rstream), + codec::LinesCodec::new_with_max_length(512) + ); + let session = Uuid::new_v4(); + info!("Accepted session {} from {}", session, addr); + + let (sender, receiver) = mpsc::channel(20); + active_sessions.lock().await.insert(session, SessionRecord { channel: sender }); + server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected { + session, source: addr.to_string() + }}).await.unwrap(); + + loop { + match rbuf.try_next().await { + Err(e) => { + info!("Client connection {} got error {}", session, e); + break; + } + Ok(None) => { + info!("Client connection {} closed", session); + break; + } + Ok(Some(msg)) => { + server.send(ServerTaskCommand::Send { + message: MessageFromListener::SessionSentLine { session, msg } + }).await.unwrap(); + /* match wstream.write_all((msg + "\r\n").as_bytes()).await { + Err(e) => { + info!("Client connection {} got error {}", session, e); + } + Ok(()) => {} + } */ + } + } + } + + server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected { + session + }}).await.unwrap(); + active_sessions.lock().await.remove(&session); +} + #[tokio::main] async fn main() -> Result<(), Box> { SimpleLogger::new().init().unwrap(); let mut config = read_latest_config()?; - let server_sender = start_server_task(config.gameserver); - + let active_sessions: SessionMap = + Arc::new(Mutex::new(BTreeMap::new())); + let server_sender = start_server_task(config.gameserver, active_sessions.clone()); + let mut sighups = signal(SignalKind::hangup())?; loop { let mut listen_handles = Vec::new(); for listener in config.listeners.clone() { + let server_sender_for_listener = server_sender.clone(); + let active_sessions_for_listener = active_sessions.clone(); listen_handles.push(task::spawn(async move { match TcpListener::bind(&listener).await { Err(e) => { warn!("Error listening to {}: {}", &listener, e); } @@ -207,7 +279,16 @@ async fn main() -> Result<(), Box> { match listensock.accept().await { Err(e) => { warn!("Error accepting connection from {}: {}", &listener, e); } - _ => {} + Ok((stream, addr)) => { + let server_sender_for_client = server_sender_for_listener.clone(); + let active_sessions_for_client = active_sessions_for_listener.clone(); + task::spawn(async move { + handle_client_socket(server_sender_for_client, + active_sessions_for_client, + stream, + addr + ).await; + }); } } } }