Slow down when urges are high.

Also fix a few issues with concurrency errors from the DB.
This commit is contained in:
Condorra 2023-09-14 22:52:24 +10:00
parent daa847b448
commit 2beaf0d2af
12 changed files with 245 additions and 125 deletions

View File

@ -21,6 +21,7 @@ use std::collections::BTreeSet;
use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use tokio_postgres::error::{DbError, SqlState};
use tokio_postgres::types::ToSql;
use tokio_postgres::NoTls;
use tokio_postgres::{config::Config as PgConfig, row::Row};
@ -1810,3 +1811,18 @@ impl DBTrans {
.ok_or("Transaction already closed".into())
}
}
pub fn is_concurrency_error(e: &(dyn Error + 'static)) -> bool {
match e.source() {
None => {}
Some(e_src) => {
if is_concurrency_error(e_src) {
return true;
}
}
}
match e.downcast_ref::<DbError>() {
None => false,
Some(dbe) => dbe.code() == &SqlState::T_R_SERIALIZATION_FAILURE,
}
}

View File

@ -1,40 +1,41 @@
use tokio::{task, time};
use tokio::net::{TcpSocket, TcpStream, lookup_host};
use log::{info, warn};
use tokio_util::codec;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use tokio_serde::formats::Cbor;
use crate::DResult;
use blastmud_interfaces::*;
use futures::prelude::*;
use tokio::sync::{Mutex, mpsc, oneshot};
use log::{info, warn};
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::sync::Arc;
use uuid::Uuid;
use std::collections::BTreeMap;
use crate::DResult;
use std::time::Instant;
use tokio::net::{lookup_host, TcpSocket, TcpStream};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::{task, time};
use tokio_serde::formats::Cbor;
use tokio_util::codec;
use tokio_util::codec::length_delimited::LengthDelimitedCodec;
use uuid::Uuid;
#[derive(Debug)]
pub struct ListenerSend {
pub message: MessageToListener,
pub ack_notify: oneshot::Sender<()>
pub ack_notify: oneshot::Sender<()>,
}
pub type ListenerMap = Arc<Mutex<BTreeMap<Uuid, mpsc::Sender<ListenerSend>>>>;
async fn handle_from_listener<FHandler, HandlerFut>(
conn: TcpStream,
message_handler: FHandler,
listener_map: ListenerMap)
where
listener_map: ListenerMap,
) where
FHandler: Fn(Uuid, MessageFromListener) -> HandlerFut + Send + 'static,
HandlerFut: Future<Output = DResult<()>> + Send + 'static {
HandlerFut: Future<Output = DResult<()>> + Send + 'static,
{
let mut conn_framed = tokio_serde::Framed::new(
codec::Framed::new(conn, LengthDelimitedCodec::new()),
Cbor::<MessageFromListener, MessageToListener>::default()
Cbor::<MessageFromListener, MessageToListener>::default(),
);
let listener_id = match conn_framed.try_next().await {
Ok(Some(ref msg@MessageFromListener::ListenerPing { uuid })) => {
Ok(Some(ref msg @ MessageFromListener::ListenerPing { uuid })) => {
let handle_fut = message_handler(uuid.clone(), msg.clone());
match handle_fut.await {
Ok(_) => {}
@ -43,7 +44,7 @@ where
}
};
uuid
},
}
Ok(Some(msg)) => {
warn!("Got non-ping first message from listener: {:?}", msg);
return;
@ -53,15 +54,24 @@ where
return;
}
Err(e) => {
warn!("Lost listener connection to error {} before first message", e);
warn!(
"Lost listener connection to error {} before first message",
e
);
return;
}
};
match conn_framed.send(MessageToListener::AcknowledgeMessage).await {
match conn_framed
.send(MessageToListener::AcknowledgeMessage)
.await
{
Ok(_) => {}
Err(e) => {
warn!("Got error sending listener acknowledge for initial ping: {}", e);
warn!(
"Got error sending listener acknowledge for initial ping: {}",
e
);
return;
}
}
@ -69,7 +79,7 @@ where
let connected_at = Instant::now();
let (sender, mut receiver) = mpsc::channel(1);
listener_map.lock().await.insert(listener_id, sender);
'listener_loop: loop {
tokio::select!(
req = conn_framed.try_next() => {
@ -178,7 +188,7 @@ where
}
// We delay to avoid wasting resources if we do end up in a loop.
time::sleep(time::Duration::from_secs(1)).await;
time::sleep(time::Duration::from_secs(1)).await;
listener_map.lock().await.remove(&listener_id);
}
@ -189,23 +199,26 @@ pub fn make_listener_map() -> ListenerMap {
pub async fn start_listener<FHandler, HandlerFut>(
bind_to: String,
listener_map: ListenerMap,
handle_message: FHandler
handle_message: FHandler,
) -> DResult<()>
where
FHandler: Fn(Uuid, MessageFromListener) -> HandlerFut + Send + Clone + 'static,
HandlerFut: Future<Output = DResult<()>> + Send + 'static
HandlerFut: Future<Output = DResult<()>> + Send + 'static,
{
info!("Starting listener on {}", bind_to);
let addr = lookup_host(bind_to).await?.next().expect("listener address didn't resolve");
let addr = lookup_host(bind_to)
.await?
.next()
.expect("listener address didn't resolve");
let socket = match addr {
SocketAddr::V4 {..} => TcpSocket::new_v4()?,
SocketAddr::V6 {..} => TcpSocket::new_v6()?
SocketAddr::V4 { .. } => TcpSocket::new_v4()?,
SocketAddr::V6 { .. } => TcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
socket.set_reuseport(true)?;
socket.bind(addr)?;
let listener = socket.listen(5)?;
let listener_map_for_task = listener_map.clone();
task::spawn(async move {
loop {
@ -215,11 +228,15 @@ where
}
Ok((socket, _)) => {
info!("Accepted new inbound connection from listener");
task::spawn(handle_from_listener(socket, handle_message.clone(), listener_map_for_task.clone()));
task::spawn(handle_from_listener(
socket,
handle_message.clone(),
listener_map_for_task.clone(),
));
}
}
}
});
Ok(())
}

View File

@ -1,39 +1,42 @@
use serde::Deserialize;
use std::fs;
use std::error::Error;
use log::{info, error, LevelFilter};
use simple_logger::SimpleLogger;
use tokio::signal::unix::{signal, SignalKind};
use db::DBPool;
use log::{error, info, LevelFilter};
use serde::Deserialize;
use simple_logger::SimpleLogger;
use std::error::Error;
use std::fs;
use tokio::signal::unix::{signal, SignalKind};
mod av;
mod db;
mod language;
mod listener;
mod message_handler;
mod version_cutover;
mod av;
mod regular_tasks;
mod models;
mod static_content;
mod language;
mod regular_tasks;
mod services;
mod static_content;
mod version_cutover;
pub type DResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
pub type DResult<T> = Result<T, Box<dyn Error + Send + Sync + 'static>>;
#[derive(Deserialize, Debug)]
struct Config {
listener: String,
pidfile: String,
database_conn_string: String
database_conn_string: String,
}
fn read_latest_config() -> DResult<Config> {
serde_yaml::from_str(&fs::read_to_string("gameserver.conf")?).
map_err(|error| Box::new(error) as Box<dyn Error + Send + Sync>)
serde_yaml::from_str(&fs::read_to_string("gameserver.conf")?)
.map_err(|error| Box::new(error) as Box<dyn Error + Send + Sync>)
}
#[tokio::main(worker_threads = 2)]
async fn main() -> DResult<()> {
SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap();
SimpleLogger::new()
.with_level(LevelFilter::Info)
.init()
.unwrap();
av::check().or_else(|e| -> Result<(), Box<dyn Error + Send + Sync>> {
error!("Couldn't verify age-verification.yml - this is not a complete game. Check README.md: {}", e);
@ -50,19 +53,20 @@ async fn main() -> DResult<()> {
let listener_map = listener::make_listener_map();
let mh_pool = pool.clone();
listener::start_listener(config.listener, listener_map.clone(),
move |listener_id, msg| {
message_handler::handle(listener_id, msg, mh_pool.clone())
}
).await?;
listener::start_listener(
config.listener,
listener_map.clone(),
move |listener_id, msg| message_handler::handle(listener_id, msg, mh_pool.clone()),
)
.await?;
static_content::refresh_static_content(&pool).await?;
version_cutover::replace_old_gameserver(&config.pidfile)?;
regular_tasks::start_regular_tasks(&pool, listener_map)?;
let mut sigusr1 = signal(SignalKind::user_defined1())?;
sigusr1.recv().await;
Ok(())
}

View File

@ -1,16 +1,16 @@
use blastmud_interfaces::*;
use crate::db;
use MessageFromListener::*;
use uuid::Uuid;
use crate::DResult;
use blastmud_interfaces::*;
use uuid::Uuid;
use MessageFromListener::*;
mod new_session;
pub mod user_commands;
#[derive(Clone,Debug)]
#[derive(Clone, Debug)]
pub struct ListenerSession {
pub listener: Uuid,
pub session: Uuid
pub session: Uuid,
}
#[cfg(test)]
@ -19,21 +19,22 @@ impl Default for ListenerSession {
use uuid::uuid;
ListenerSession {
listener: uuid!("6f9c9b61-9228-4427-abd7-c4aef127a862"),
session: uuid!("668efb68-79d3-4004-9d6a-1e5757792e1a")
session: uuid!("668efb68-79d3-4004-9d6a-1e5757792e1a"),
}
}
}
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool)
-> DResult<()> {
pub async fn handle(listener: Uuid, msg: MessageFromListener, pool: db::DBPool) -> DResult<()> {
match msg {
ListenerPing { .. } => { pool.record_listener_ping(listener).await?; }
ListenerPing { .. } => {
pool.record_listener_ping(listener).await?;
}
SessionConnected { session, source } => {
new_session::handle(
&ListenerSession { listener, session }, source, &pool).await?;
new_session::handle(&ListenerSession { listener, session }, source, &pool).await?;
}
SessionDisconnected { session } => {
pool.end_session(ListenerSession { listener, session }).await?;
pool.end_session(ListenerSession { listener, session })
.await?;
}
SessionSentLine { session, msg } => {
user_commands::handle(&ListenerSession { listener, session }, &msg, &pool).await?;

View File

@ -1,7 +1,7 @@
use super::ListenerSession;
#[double]
use crate::db::DBTrans;
use crate::db::{DBPool, ItemSearchParams};
use crate::db::{is_concurrency_error, DBPool, ItemSearchParams};
use crate::models::user::UserFlag;
use crate::models::{item::Item, session::Session, user::User};
use crate::DResult;
@ -272,10 +272,13 @@ fn resolve_handler(ctx: &VerbContext, cmd: &str) -> Option<&'static UserVerbRef>
result
}
#[cfg(not(test))]
pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> {
pub async fn handle_in_trans(
session: &ListenerSession,
msg: &str,
pool: &DBPool,
trans: DBTrans,
) -> DResult<()> {
let (cmd, params) = parsing::parse_command_name(msg);
let trans = pool.start_transaction().await?;
let (mut session_dat, mut user_dat) = match trans.get_session_user_model(session).await? {
None => {
// If the session has been cleaned up from the database, there is
@ -320,6 +323,24 @@ pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DRes
Err(SystemError(e)) => Err(e)?,
},
}
Ok(())
}
#[cfg(not(test))]
pub async fn handle(session: &ListenerSession, msg: &str, pool: &DBPool) -> DResult<()> {
loop {
let trans = pool.start_transaction().await?;
match handle_in_trans(session, msg, pool, trans).await {
Ok(_) => break,
Err(e) => {
if is_concurrency_error(e.as_ref()) {
continue;
} else {
return Err(e);
}
}
}
}
pool.bump_session_time(&session).await?;
Ok(())
}

View File

@ -691,7 +691,65 @@ impl QueueCommandHandler for QueueHandler {
move_to_where(&use_location, direction, ctx).await?;
stand_if_needed(&ctx.trans, &mut ctx.item).await?;
propagate_move_to_followers(&ctx.trans, &mut ctx.item, &direction, &source).await?;
Ok(time::Duration::from_secs(1))
let mut move_factor: u64 = 1;
let mut slow_factors: Vec<String> = vec![];
if let Some(urges) = ctx.item.urges.as_ref() {
if urges.hunger.value > 9500 {
slow_factors.push("you're starving".to_owned());
move_factor *= 8;
} else if urges.hunger.value > 8000 {
slow_factors.push("you're very hungry".to_owned());
move_factor *= 4;
} else if urges.hunger.value > 5000 {
slow_factors.push("you're hungry".to_owned());
move_factor *= 2;
}
if urges.thirst.value > 9500 {
slow_factors.push("your throat is parched with thirst".to_owned());
move_factor *= 8;
} else if urges.thirst.value > 8000 {
slow_factors.push("you're very thirsty".to_owned());
move_factor *= 4;
} else if urges.thirst.value > 5000 {
slow_factors.push("you're thirsty".to_owned());
move_factor *= 2;
}
if urges.stress.value > 9500 {
slow_factors.push("you're exhausted".to_owned());
move_factor *= 8;
} else if urges.stress.value > 8000 {
slow_factors.push("you're very stressed and tired".to_owned());
move_factor *= 4;
} else if urges.stress.value > 5000 {
slow_factors.push("you're stressed and tired".to_owned());
move_factor *= 2;
}
}
if slow_factors.len() > 0 {
if let Some((sess, _)) = ctx.get_session().await? {
ctx.trans
.queue_for_session(
&sess,
Some(&format!(
"You move slowly because {}.\n",
language::join_words(
&slow_factors
.iter()
.map(|f| f.as_str())
.collect::<Vec<&str>>()
)
)),
)
.await?
}
}
if move_factor > 16 {
move_factor = 16;
}
Ok(time::Duration::from_secs(move_factor))
}
#[allow(unreachable_patterns)]

View File

@ -1,7 +1,7 @@
pub mod session;
pub mod user;
pub mod item;
pub mod task;
pub mod consent;
pub mod corp;
pub mod item;
pub mod journal;
pub mod session;
pub mod task;
pub mod user;

View File

@ -1,6 +1,6 @@
use serde::{Serialize, Deserialize};
use chrono::{DateTime, Utc};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub enum ConsentType {
@ -8,7 +8,7 @@ pub enum ConsentType {
Medicine,
Gifts,
Visit,
Sex
Sex,
}
impl ConsentType {
@ -20,7 +20,7 @@ impl ConsentType {
"gifts" => Some(Gifts),
"visit" => Some(Visit),
"sex" => Some(Sex),
_ => None
_ => None,
}
}
@ -38,8 +38,8 @@ impl ConsentType {
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub enum ConsentStatus {
PendingAdd, // Added but awaiting other party to ratify by giving matching consent.
Active, // Consent in force, no delete pending.
PendingAdd, // Added but awaiting other party to ratify by giving matching consent.
Active, // Consent in force, no delete pending.
PendingDelete, // Pending cancellation but other party has to also disallow to ratify.
}
@ -57,7 +57,7 @@ impl Default for FightConsent {
status: ConsentStatus::PendingAdd,
pending_change: None,
allow_pick: false,
freely_revoke: false
freely_revoke: false,
}
}
}
@ -76,31 +76,30 @@ impl Default for Consent {
Self {
fight_consent: None,
expires: None,
only_in: vec!(),
only_in: vec![],
allow_private: false,
until_death: false
until_death: false,
}
}
}
impl Consent {
pub fn to_str(&self) -> String {
let mut details = vec!();
let mut details = vec![];
if let Some(ref fc) = self.fight_consent {
match fc.status {
ConsentStatus::PendingAdd => {
details.push("pending acceptance".to_owned());
},
}
ConsentStatus::PendingDelete => {
details.push("pending agreement to delete".to_owned());
},
}
_ => {}
}
match fc.pending_change.as_ref() {
None => {},
None => {}
Some(new_self) => {
details.push(format!("pending amendment to [{}]",
&new_self.to_str()));
details.push(format!("pending amendment to [{}]", &new_self.to_str()));
}
}
if fc.allow_pick {
@ -122,10 +121,12 @@ impl Consent {
details.push(format!("in {}", in_place))
}
if let Some(exp) = self.expires {
details.push(format!("valid for {}",
humantime::format_duration(std::time::Duration::from_secs(
(exp - Utc::now()).num_seconds() as u64
))));
details.push(format!(
"valid for {}",
humantime::format_duration(std::time::Duration::from_secs(
(exp - Utc::now()).num_seconds() as u64
))
));
}
return details.into_iter().join(", ");
}

View File

@ -1,5 +1,5 @@
use serde::{Serialize, Deserialize};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Clone)]
pub enum CorpPermission {
@ -22,7 +22,7 @@ impl CorpPermission {
"war" => Some(War),
"config" | "configure" => Some(Configure),
"finance" | "finances" => Some(Finance),
_ => None
_ => None,
}
}
pub fn display(&self) -> &'static str {
@ -39,7 +39,6 @@ impl CorpPermission {
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Clone)]
pub enum CorpCommType {
Chat,
@ -60,7 +59,7 @@ impl CorpCommType {
"reward" => Some(Reward),
"death" => Some(Death),
"consent" => Some(Consent),
_ => None
_ => None,
}
}
pub fn display(&self) -> &'static str {
@ -95,7 +94,7 @@ impl Default for Corp {
Self {
name: "Unset".to_owned(),
allow_combat_required: false,
member_permissions: vec!(),
member_permissions: vec![],
founded: Utc::now(),
}
}
@ -119,18 +118,11 @@ impl Default for CorpMembership {
CorpMembership {
invited_at: None,
joined_at: None,
permissions: vec!(),
permissions: vec![],
allow_combat: false,
job_title: "Employee".to_owned(),
priority: 100,
comms_on: vec!(
Chat,
Notice,
Connect,
Reward,
Death,
Consent,
),
comms_on: vec![Chat, Notice, Connect, Reward, Death, Consent],
}
}
}

View File

@ -1,15 +1,14 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JournalType {
SlayedMeanDog,
Died
Died,
}
#[derive(Serialize, Deserialize, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JournalInProgress {
}
pub enum JournalInProgress {}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
@ -22,7 +21,7 @@ impl Default for JournalState {
fn default() -> Self {
Self {
completed_journals: BTreeSet::new(),
in_progress_journals: vec!(),
in_progress_journals: vec![],
}
}
}

View File

@ -3,7 +3,7 @@ use crate::db::DBTrans;
#[cfg(not(test))]
use crate::models::task::{TaskParse, TaskRecurrence};
use crate::{
db,
db::{self, is_concurrency_error},
listener::{ListenerMap, ListenerSend},
message_handler::user_commands::{delete, drop, hire, open, rent},
models::task::Task,
@ -168,6 +168,9 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
};
match handler.do_task(&mut ctx).await {
Err(e) => {
if is_concurrency_error(e.as_ref()) {
continue;
}
task.meta.consecutive_failure_count += 1;
warn!("Error handling event of type {} code {} (consecutive count: {}): {:?}",
&task.details.name(), &task.meta.task_code,
@ -265,7 +268,7 @@ async fn process_tasks_once(pool: db::DBPool) -> DResult<()> {
#[cfg(test)]
async fn process_tasks_once(_pool: db::DBPool) -> DResult<()> {
task_handler_registry();
unimplemented!();
Ok(())
}
fn start_task_runner(pool: db::DBPool) {

View File

@ -1,28 +1,35 @@
use crate::DResult;
use log::info;
use nix::{
sys::signal::{kill, Signal},
unistd::{getpid, Pid},
};
use std::error::Error;
use std::fs::{read_to_string, write};
use std::path::Path;
use std::error::Error;
use log::info;
use nix::{sys::signal::{kill, Signal}, unistd::{Pid, getpid}};
use crate::DResult;
pub fn replace_old_gameserver(pidfile: &str) -> DResult<()> {
match read_to_string(pidfile) {
Err(e) =>
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
info!("pidfile not found, assuming not already running");
Ok(())
} else {
} else {
info!("Error reading pidfile (other than NotFound): {}", e);
Err(Box::new(e) as Box::<dyn Error + Send + Sync>)
Err(Box::new(e) as Box<dyn Error + Send + Sync>)
}
}
Ok(f) => {
let pid: Pid = Pid::from_raw(f.parse().map_err(|e| Box::new(e) as Box::<dyn Error + Send + Sync>)?);
let pid: Pid = Pid::from_raw(
f.parse()
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?,
);
if pid == getpid() {
info!("Pid in pidfile is me - ignoring");
return Ok(());
}
}
match read_to_string(format!("/proc/{}/cmdline", pid)) {
Ok(content) =>
Ok(content) => {
if content.contains("blastmud_game") {
info!("pid in pidfile references blastmud_game; starting cutover");
kill(pid, Signal::SIGUSR1)
@ -31,6 +38,7 @@ pub fn replace_old_gameserver(pidfile: &str) -> DResult<()> {
info!("Pid in pidfile is for process not including blastmud_game - ignoring pidfile");
Ok(())
}
}
Err(_) => {
info!("Pid in pidfile is gone - ignoring pidfile");
Ok(())
@ -40,5 +48,5 @@ pub fn replace_old_gameserver(pidfile: &str) -> DResult<()> {
}?;
info!("Writing new pidfile");
write(Path::new(pidfile), format!("{}", std::process::id()))
.map_err(|e| Box::new(e) as Box::<dyn Error + Send + Sync>)
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
}