Implement basics of listener

This commit is contained in:
Condorra 2022-12-13 22:44:55 +11:00
parent b4bc83ba02
commit 2881d7a274

View File

@ -9,7 +9,7 @@ use tokio::time::{self, Duration};
use tokio::net::{TcpStream, TcpListener}; use tokio::net::{TcpStream, TcpListener};
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
use tokio::io::{BufReader}; use tokio::io::{BufReader, AsyncWriteExt};
use log::{warn, info}; use log::{warn, info};
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
use std::sync::Arc; use std::sync::Arc;
@ -37,13 +37,17 @@ enum ServerTaskCommand {
Send { message: MessageFromListener } Send { message: MessageFromListener }
} }
fn run_server_task<FHandler : Fn(MessageToListener) -> () + Send + 'static>( fn run_server_task<FHandler, HandlerFut>(
unfinished_business: Option<MessageFromListener>, unfinished_business: Option<MessageFromListener>,
mut receiver: mpsc::Receiver<ServerTaskCommand>, mut receiver: mpsc::Receiver<ServerTaskCommand>,
sender: mpsc::Sender<ServerTaskCommand>, sender: mpsc::Sender<ServerTaskCommand>,
server: String, server: String,
message_handler: FHandler message_handler: FHandler
) { )
where
FHandler: Fn(MessageToListener) -> HandlerFut + Send + 'static,
HandlerFut: Future<Output = ()>
{
task::spawn(async move { task::spawn(async move {
let conn = loop { let conn = loop {
match TcpStream::connect(&server).await { match TcpStream::connect(&server).await {
@ -186,25 +190,60 @@ fn run_server_task<FHandler : Fn(MessageToListener) -> () + Send + 'static>(
} }
enum SessionCommand { enum SessionCommand {
Disconnect Disconnect,
SendString { message : String }
} }
struct SessionRecord { struct SessionRecord {
channel: mpsc::Sender<SessionCommand> channel: mpsc::Sender<SessionCommand>,
disconnect_channel: mpsc::UnboundedSender<()>
} }
type SessionMap = Arc<Mutex<BTreeMap<Uuid, SessionRecord>>>; type SessionMap = Arc<Mutex<BTreeMap<Uuid, SessionRecord>>>;
fn handle_server_message(session_map: SessionMap, message: MessageToListener) { async fn handle_server_message(session_map: SessionMap, message: MessageToListener) {
match message {
MessageToListener::AcknowledgeMessage => {
warn!("Unexpected AcknowledgeMessage from gameserver. This suggests a bug in the gameserver");
}
MessageToListener::DisconnectSession { session } => {
match session_map.lock().await.get(&session) {
// Just silently ignore it if they are disconnected.
None => {}
Some(SessionRecord { channel, disconnect_channel, .. }) => {
match channel.try_send(SessionCommand::Disconnect) {
Err(mpsc::error::TrySendError::Full(_)) => {
disconnect_channel.send(()).unwrap_or(());
}
_ => {}
}
}
}
}
MessageToListener::SendToSession { session, msg } => {
match session_map.lock().await.get(&session) {
// Just silently ignore it if they are disconnected.
None => {}
Some(SessionRecord { channel, .. }) => {
channel.try_send(SessionCommand::SendString { message: msg })
.unwrap_or(());
}
}
}
}
} }
fn start_server_task(server: String, session_map: SessionMap) -> mpsc::Sender<ServerTaskCommand> { fn start_server_task(server: String, session_map: SessionMap) -> mpsc::Sender<ServerTaskCommand> {
let (sender, receiver) = mpsc::channel(20); let (sender, receiver) = mpsc::channel(20);
run_server_task(None, receiver, sender.clone(), server, run_server_task(None, receiver, sender.clone(), server,
move |msg| { handle_server_message(session_map.clone(), msg); }); move |msg| handle_server_message(session_map.clone(),
msg) );
sender sender
} }
const MAX_CAPACITY: usize = 20;
const STOP_READING_CAPACITY: usize = 10;
async fn handle_client_socket( async fn handle_client_socket(
server: mpsc::Sender<ServerTaskCommand>, server: mpsc::Sender<ServerTaskCommand>,
active_sessions: SessionMap, active_sessions: SessionMap,
@ -219,34 +258,58 @@ async fn handle_client_socket(
let session = Uuid::new_v4(); let session = Uuid::new_v4();
info!("Accepted session {} from {}", session, addr); info!("Accepted session {} from {}", session, addr);
let (sender, receiver) = mpsc::channel(20);
active_sessions.lock().await.insert(session, SessionRecord { channel: sender }); let (lsender, mut lreceiver) = mpsc::channel(MAX_CAPACITY);
let (discon_sender, mut discon_receiver) = mpsc::unbounded_channel();
active_sessions.lock().await.insert(
session, SessionRecord {
channel: lsender.clone(),
disconnect_channel: discon_sender.clone()
});
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected { server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionConnected {
session, source: addr.to_string() session, source: addr.to_string()
}}).await.unwrap(); }}).await.unwrap();
loop { 'client_loop: loop {
match rbuf.try_next().await { tokio::select!(
Err(e) => { Some(()) = discon_receiver.recv() => {
info!("Client connection {} got error {}", session, e); info!("Client connection {} instructed for immediate disconnect", session);
break; break 'client_loop;
} }
Ok(None) => { Some(message) = lreceiver.recv() => {
info!("Client connection {} closed", session); match message {
break; SessionCommand::Disconnect => {
} info!("Client connection {} instructed for disconnect", session);
Ok(Some(msg)) => { break 'client_loop;
server.send(ServerTaskCommand::Send { }
message: MessageFromListener::SessionSentLine { session, msg } SessionCommand::SendString { message } =>
}).await.unwrap(); match wstream.write_all((message + "\r\n").as_bytes()).await {
/* match wstream.write_all((msg + "\r\n").as_bytes()).await { Err(e) => {
info!("Client connection {} got error {}", session, e);
}
Ok(()) => {}
}
}
},
line_read = rbuf.try_next(), if lsender.capacity() > STOP_READING_CAPACITY => {
match line_read {
Err(e) => { Err(e) => {
info!("Client connection {} got error {}", session, e); info!("Client connection {} got error {}", session, e);
break 'client_loop;
} }
Ok(()) => {} Ok(None) => {
} */ info!("Client connection {} closed", session);
break 'client_loop;
}
Ok(Some(msg)) => {
server.send(ServerTaskCommand::Send {
message: MessageFromListener::SessionSentLine { session, msg }
}).await.unwrap();
}
}
} }
} );
} }
server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected { server.send(ServerTaskCommand::Send { message: MessageFromListener::SessionDisconnected {
@ -312,3 +375,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
} }
} }
#[cfg(test)]
mod tests {
#[test]
fn doesnt_stop_reading_at_max_capacity() {
use crate::*;
assert!(MAX_CAPACITY > STOP_READING_CAPACITY);
}
}