More progress on listener

This commit is contained in:
Condorra 2022-12-12 23:36:07 +11:00
parent 1348525283
commit b4bc83ba02
4 changed files with 101 additions and 15 deletions

4
Cargo.lock generated
View File

@ -48,6 +48,7 @@ dependencies = [
"tokio",
"tokio-serde",
"tokio-util",
"uuid",
]
[[package]]
@ -568,7 +569,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"pin-project-lite",
@ -656,6 +659,7 @@ version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [
"getrandom",
"serde",
]

View File

@ -4,15 +4,15 @@ use serde::*;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum MessageFromListener {
ListenerPing { uuid: Uuid },
UserConnected { user: Uuid, source: String },
UserDisconnected { user: Uuid },
UserSentLine { user: Uuid, msg: String },
SessionConnected { session: Uuid, source: String },
SessionDisconnected { session: Uuid },
SessionSentLine { session: Uuid, msg: String },
AcknowledgeMessage
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum MessageToListener {
DisconnectUser { user: Uuid },
SendToUser { user: Uuid, msg: String },
DisconnectSession { session: Uuid },
SendToSession { session: Uuid, msg: String },
AcknowledgeMessage
}

View File

@ -13,6 +13,7 @@ rand = "0.8.5"
serde = { version = "1.0.149", features = ["derive", "serde_derive"] }
serde_yaml = "0.9.14"
simple_logger = "4.0.0"
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync"] }
tokio = { version = "1.23.0", features = ["signal", "net", "macros", "rt-multi-thread", "rt", "tokio-macros", "time", "sync", "io-util"] }
tokio-serde = { version = "0.8.0", features = ["cbor", "serde", "serde_cbor"] }
tokio-util = { version = "0.7.4", features = ["codec"] }
uuid = { version = "1.2.2", features = ["rng", "serde", "v4"] }

View File

@ -1,20 +1,24 @@
use std::vec::Vec;
use std::collections::BTreeMap;
use std::error::Error;
use std::net::SocketAddr;
use std::fs;
use serde::*;
use tokio::task;
use tokio::time::{self, Duration};
use tokio::net::{TcpStream, TcpListener};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Mutex};
use tokio::io::{BufReader};
use log::{warn, info};
use simple_logger::SimpleLogger;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use blastmud_interfaces::*;
use tokio_util::codec;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio_serde::formats::Cbor;
use futures::prelude::*;
use uuid::Uuid;
#[derive(Deserialize, Debug)]
struct Config {
@ -33,12 +37,12 @@ enum ServerTaskCommand {
Send { message: MessageFromListener }
}
fn run_server_task(
fn run_server_task<FHandler : Fn(MessageToListener) -> () + Send + 'static>(
unfinished_business: Option<MessageFromListener>,
mut receiver: mpsc::Receiver<ServerTaskCommand>,
sender: mpsc::Sender<ServerTaskCommand>,
server: String,
message_handler: fn (message: MessageToListener) -> ()
message_handler: FHandler
) {
task::spawn(async move {
let conn = loop {
@ -181,24 +185,92 @@ fn run_server_task(
}
fn start_server_task(server: String) -> mpsc::Sender<ServerTaskCommand> {
enum SessionCommand {
Disconnect
}
struct SessionRecord {
channel: mpsc::Sender<SessionCommand>
}
type SessionMap = Arc<Mutex<BTreeMap<Uuid, SessionRecord>>>;
fn handle_server_message(session_map: SessionMap, message: MessageToListener) {
}
fn start_server_task(server: String, session_map: SessionMap) -> mpsc::Sender<ServerTaskCommand> {
let (sender, receiver) = mpsc::channel(20);
run_server_task(None, receiver, sender.clone(), server, |_msg| {});
run_server_task(None, receiver, sender.clone(), server,
move |msg| { handle_server_message(session_map.clone(), msg); });
sender
}
async fn handle_client_socket(
server: mpsc::Sender<ServerTaskCommand>,
active_sessions: SessionMap,
mut stream: TcpStream,
addr: SocketAddr
) {
let (rstream, mut wstream) = stream.split();
let mut rbuf = codec::FramedRead::new(
BufReader::new(rstream),
codec::LinesCodec::new_with_max_length(512)
);
let session = Uuid::new_v4();
info!("Accepted session {} from {}", session, addr);
let (sender, receiver) = mpsc::channel(20);
active_sessions.lock().await.insert(session, SessionRecord { channel: sender });
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected {
session, source: addr.to_string()
}}).await.unwrap();
loop {
match rbuf.try_next().await {
Err(e) => {
info!("Client connection {} got error {}", session, e);
break;
}
Ok(None) => {
info!("Client connection {} closed", session);
break;
}
Ok(Some(msg)) => {
server.send(ServerTaskCommand::Send {
message: MessageFromListener::SessionSentLine { session, msg }
}).await.unwrap();
/* match wstream.write_all((msg + "\r\n").as_bytes()).await {
Err(e) => {
info!("Client connection {} got error {}", session, e);
}
Ok(()) => {}
} */
}
}
}
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected {
session
}}).await.unwrap();
active_sessions.lock().await.remove(&session);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
SimpleLogger::new().init().unwrap();
let mut config = read_latest_config()?;
let server_sender = start_server_task(config.gameserver);
let active_sessions: SessionMap =
Arc::new(Mutex::new(BTreeMap::new()));
let server_sender = start_server_task(config.gameserver, active_sessions.clone());
let mut sighups = signal(SignalKind::hangup())?;
loop {
let mut listen_handles = Vec::new();
for listener in config.listeners.clone() {
let server_sender_for_listener = server_sender.clone();
let active_sessions_for_listener = active_sessions.clone();
listen_handles.push(task::spawn(async move {
match TcpListener::bind(&listener).await {
Err(e) => { warn!("Error listening to {}: {}", &listener, e); }
@ -207,7 +279,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match listensock.accept().await {
Err(e) => { warn!("Error accepting connection from {}: {}",
&listener, e); }
_ => {}
Ok((stream, addr)) => {
let server_sender_for_client = server_sender_for_listener.clone();
let active_sessions_for_client = active_sessions_for_listener.clone();
task::spawn(async move {
handle_client_socket(server_sender_for_client,
active_sessions_for_client,
stream,
addr
).await;
}); }
}
}
}