use std::{ collections::BTreeMap, mem::replace, ops::{Deref, DerefMut}, sync::Arc, }; use indexed_db_futures::{ idb_object_store::IdbObjectStoreParameters, request::{IdbOpenDbRequestLike, OpenDbRequest}, IdbDatabase, IdbKeyPath, IdbQuerySource, IdbVersionChangeEvent, }; use itertools::Itertools; use serde::{Deserialize, Serialize}; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::{ js_sys::{Array, Uint8Array}, spawn_local, }; use web_sys::{ console, js_sys::Date, Blob, BlobPropertyBag, DomException, HtmlAnchorElement, IdbKeyRange, IdbTransactionMode, Url, }; use crate::{echo_to_term_frame, GlobalMemoCell, TermFrame}; #[derive(Clone, Debug)] pub enum QueuedAction { LogEvent(LogEvent), ReportOnLogStreams(TermFrame), DeleteLogs(DeleteLogs), DownloadLogs(DownloadLogs), } #[derive(Clone, Debug)] pub struct DeleteLogs { pub reply_to: TermFrame, pub stream: String, pub min_date: String, pub max_date: String, } #[derive(Clone, Debug)] pub struct DownloadLogs { pub reply_to: TermFrame, pub stream: String, pub min_date: String, pub max_date: String, } #[derive(Clone, Debug)] pub struct LogEvent { pub log_stream: String, pub timestamp: Date, pub message: String, } #[derive(Serialize, Deserialize)] pub struct StoredLogEvent { pub stream: String, pub timestamp: String, pub message: String, } impl From<&LogEvent> for StoredLogEvent { fn from(value: &LogEvent) -> Self { Self { stream: value.log_stream.clone(), timestamp: value .timestamp .to_iso_string() .as_string() .expect("timestamp to_iso_string wasn't string"), message: value.message.clone(), } } } pub enum LoggingEngine { Uninitialised, Initialising { backlog: Vec }, Unavailable, Ready { db: Arc }, } impl Default for LoggingEngine { fn default() -> Self { Self::Uninitialised } } async fn async_init_logging() -> Result { let mut db_req: OpenDbRequest = IdbDatabase::open_u32("logging", 1)?; db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { if evt .db() .object_store_names() .find(|n| n == "logs") .is_none() { let store = evt.db().create_object_store_with_params( "logs", ::default().auto_increment(true), )?; store.create_index( "by_stream_timestamp", &IdbKeyPath::str_sequence(&["stream", "timestamp"]), )?; } Ok(()) })); let db: IdbDatabase = db_req.await?; Ok(LoggingEngine::Ready { db: db.into() }) } fn logging_broken_message(global: &GlobalMemoCell, frame: &TermFrame) { let _ = echo_to_term_frame(global, frame, "Couldn't enable logging. This sometimes happens if your browser doesn't enable indexed storage, or is in a mode (e.g. private browsing) where such storage is not permitted.\r\n"); } fn init_logging(global: &GlobalMemoCell) { let global: GlobalMemoCell = global.clone(); spawn_local(async move { match async_init_logging().await { Ok(new_engine) => { let old_engine = replace(global.log_engine.borrow_mut().deref_mut(), new_engine); if let LoggingEngine::Initialising { backlog } = old_engine { for action in backlog.into_iter() { match action { QueuedAction::LogEvent(ev) => queue_immediate_log(&global, ev), QueuedAction::ReportOnLogStreams(frame) => { immediate_report_log_frame(&global, frame); } QueuedAction::DeleteLogs(act) => { immediate_delete_logs(&global, act); } QueuedAction::DownloadLogs(act) => { immediate_download_logs(&global, act); } } } } } Err(_) => { if let LoggingEngine::Initialising { .. } = global.log_engine.borrow().deref() { logging_broken_message(&global, &TermFrame(1)); } } } }) } fn queue_immediate_log_refutable(db: &IdbDatabase, event: &LogEvent) -> Result<(), String> { let trans = db .transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite) .map_err(|e| e.message())?; let store = trans.object_store("logs").map_err(|e| e.message())?; store .put_val( &serde_wasm_bindgen::to_value::(&event.into()) .map_err(|_| "Can't serialise event")?, ) .map_err(|e| e.message())?; Ok(()) } fn queue_immediate_log(global: &GlobalMemoCell, event: LogEvent) { if let LoggingEngine::Ready { db } = global.log_engine.borrow().deref() { match queue_immediate_log_refutable(db, &event) { Ok(()) => {} Err(e) => { console::log_2( &JsValue::from_str("Error writing event to IndexedDb"), &JsValue::from(e), ); } } } } async fn immediate_report_log_frame_refutable( global: GlobalMemoCell, db: &IdbDatabase, frame: TermFrame, ) -> Result<(), DomException> { let trans = db.transaction_on_one("logs")?; let store = trans.object_store("logs")?; let curs = store.open_cursor()?.await?; let curs = match curs { None => { let _ = echo_to_term_frame(&global, &frame, "No logs are currently stored.\r\n"); return Ok(()); } Some(curs) => curs, }; let mut event_stats = BTreeMap::::new(); loop { if let Ok(ev) = serde_wasm_bindgen::from_value::(curs.value()) { event_stats .entry(ev.stream) .and_modify(|v| *v += 1) .or_insert(1); }; if !curs.continue_cursor()?.await? { break; } } let msg = event_stats .iter() .map(|(k, v)| format!("Log stream {} has {} log records.\r\n", k, v)) .join(""); let _ = echo_to_term_frame(&global, &frame, &msg); Ok(()) } async fn immediate_report_log_frame_async( global: GlobalMemoCell, db: Arc, frame: TermFrame, ) { match immediate_report_log_frame_refutable(global, &db, frame).await { Ok(()) => {} Err(e) => { console::log_2( &JsValue::from_str("Error reading log stats from IndexedDb"), &JsValue::from(e), ); } } } fn immediate_report_log_frame(global: &GlobalMemoCell, frame: TermFrame) { let log_engine = global.log_engine.borrow(); if let LoggingEngine::Ready { db } = log_engine.deref() { spawn_local(immediate_report_log_frame_async( global.clone(), db.clone(), frame, )); } } async fn immediate_delete_logs_refutable( global: GlobalMemoCell, db: &IdbDatabase, act: DeleteLogs, ) -> Result<(), DomException> { let trans = db.transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite)?; let store = trans.object_store("logs")?; let idx = store.index("by_stream_timestamp")?; let range = IdbKeyRange::bound( &JsValue::from( [ JsValue::from_str(&act.stream), JsValue::from_str(&act.min_date), ] .iter() .collect::(), ), &JsValue::from( [ JsValue::from_str(&act.stream), JsValue::from_str(&act.max_date), ] .iter() .collect::(), ), )?; let cur = match idx.open_key_cursor_with_range(&range)?.await? { None => { let _ = echo_to_term_frame( &global, &act.reply_to, "No logs matched; no logs deleted.\r\n", ); return Ok(()); } Some(cur) => cur, }; let mut records: u64 = 0; loop { if let Some(pk) = cur.primary_key() { store.delete(&pk)?; records += 1; } if !cur.continue_cursor()?.await? { break; } } let _ = echo_to_term_frame( &global, &act.reply_to, &format!("{} logs matched and were deleted.\r\n", records), ); Ok(()) } async fn immediate_delete_logs_async( global: GlobalMemoCell, db: Arc, act: DeleteLogs, ) { match immediate_delete_logs_refutable(global, &db, act).await { Ok(()) => {} Err(e) => { console::log_2( &JsValue::from_str("Error deleting logs from IndexedDb"), &JsValue::from(e), ); } } } fn immediate_delete_logs(global: &GlobalMemoCell, act: DeleteLogs) { let log_engine = global.log_engine.borrow(); if let LoggingEngine::Ready { db } = log_engine.deref() { spawn_local(immediate_delete_logs_async( global.clone(), db.clone(), act.clone(), )); } } async fn immediate_download_logs_refutable( global: GlobalMemoCell, db: &IdbDatabase, act: DownloadLogs, ) -> Result<(), DomException> { let trans = db.transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite)?; let store = trans.object_store("logs")?; let idx = store.index("by_stream_timestamp")?; let range = IdbKeyRange::bound( &JsValue::from( [ JsValue::from_str(&act.stream), JsValue::from_str(&act.min_date), ] .iter() .collect::(), ), &JsValue::from( [ JsValue::from_str(&act.stream), JsValue::from_str(&act.max_date), ] .iter() .collect::(), ), )?; let cur = match idx.open_key_cursor_with_range(&range)?.await? { None => { let _ = echo_to_term_frame( &global, &act.reply_to, "No logs matched; no logs downloaded.\r\n", ); return Ok(()); } Some(cur) => cur, }; let mut buf: String = String::new(); loop { if let Some(k) = cur.primary_key() { if let Some(Ok(rec)) = store .get(&k)? .await? .map(serde_wasm_bindgen::from_value::) { buf.push_str(&rec.message); } } if !cur.continue_cursor()?.await? { break; } } let buf_bytes = buf.as_bytes(); let buf_array = Uint8Array::new_with_length(buf_bytes.len() as u32); buf_array.copy_from(buf_bytes); let mut blobprops: BlobPropertyBag = Default::default(); blobprops.type_("text/plain"); let blob = Blob::new_with_u8_array_sequence_and_options( &[&buf_array].into_iter().collect::(), &blobprops, )?; console::log_2(&buf_array, &blob); let url = Url::create_object_url_with_blob(&blob)?; if let Some((doc, body)) = web_sys::window() .and_then(|w| w.document()) .and_then(|d| d.body().map(|b| (d, b))) { let el_anchor = JsCast::unchecked_into::(doc.create_element("a")?); body.append_child(&el_anchor)?; el_anchor.set_href(&url); el_anchor.set_download(&format!("{}-logs.txt", act.stream)); el_anchor.click(); body.remove_child(&el_anchor)?; } // Url::revoke_object_url(&url)?; Ok(()) } async fn immediate_download_logs_async( global: GlobalMemoCell, db: Arc, act: DownloadLogs, ) { match immediate_download_logs_refutable(global, &db, act).await { Ok(()) => {} Err(e) => { console::log_2( &JsValue::from_str("Error downloading logs from IndexedDb"), &JsValue::from(e), ); } } } fn immediate_download_logs(global: &GlobalMemoCell, act: DownloadLogs) { let log_engine = global.log_engine.borrow(); if let LoggingEngine::Ready { db } = log_engine.deref() { spawn_local(immediate_download_logs_async( global.clone(), db.clone(), act.clone(), )); } } pub fn log(global: &GlobalMemoCell, event: &LogEvent) { let mut engine_borrow = global.log_engine.borrow_mut(); match engine_borrow.deref_mut() { LoggingEngine::Uninitialised => { *engine_borrow = LoggingEngine::Initialising { backlog: vec![QueuedAction::LogEvent(event.clone())], }; drop(engine_borrow); init_logging(global); } LoggingEngine::Initialising { ref mut backlog } => { backlog.push(QueuedAction::LogEvent(event.clone())); } LoggingEngine::Unavailable => { // Assume the user has already been informed it failed, so do nothing. } LoggingEngine::Ready { .. } => { drop(engine_borrow); queue_immediate_log(global, event.clone()); } } } pub fn start_listing_logs(global: &GlobalMemoCell, send_to: &TermFrame) { let mut engine_borrow = global.log_engine.borrow_mut(); match engine_borrow.deref_mut() { LoggingEngine::Uninitialised => { *engine_borrow = LoggingEngine::Initialising { backlog: vec![QueuedAction::ReportOnLogStreams(send_to.clone())], }; drop(engine_borrow); init_logging(global); } LoggingEngine::Initialising { ref mut backlog } => { backlog.push(QueuedAction::ReportOnLogStreams(send_to.clone())); } LoggingEngine::Unavailable => { // Assume the user has already been informed it failed, so do nothing. } LoggingEngine::Ready { .. } => { drop(engine_borrow); immediate_report_log_frame(global, send_to.clone()); } } } pub fn start_deleting_logs( global: &GlobalMemoCell, reply_to: &TermFrame, stream: &str, min_date: &str, max_date: &str, ) { let mut engine_borrow = global.log_engine.borrow_mut(); let act = DeleteLogs { reply_to: reply_to.clone(), stream: stream.to_owned(), min_date: min_date.to_owned(), max_date: max_date.to_owned(), }; match engine_borrow.deref_mut() { LoggingEngine::Uninitialised => { *engine_borrow = LoggingEngine::Initialising { backlog: vec![QueuedAction::DeleteLogs(act)], }; drop(engine_borrow); init_logging(global); } LoggingEngine::Initialising { ref mut backlog } => { backlog.push(QueuedAction::DeleteLogs(act)); } LoggingEngine::Unavailable => { // Assume the user has already been informed it failed, so do nothing. } LoggingEngine::Ready { .. } => { drop(engine_borrow); immediate_delete_logs(global, act); } } } pub fn start_downloading_logs( global: &GlobalMemoCell, reply_to: &TermFrame, stream: &str, min_date: &str, max_date: &str, ) { let mut engine_borrow = global.log_engine.borrow_mut(); let act = DownloadLogs { reply_to: reply_to.clone(), stream: stream.to_owned(), min_date: min_date.to_owned(), max_date: max_date.to_owned(), }; match engine_borrow.deref_mut() { LoggingEngine::Uninitialised => { *engine_borrow = LoggingEngine::Initialising { backlog: vec![QueuedAction::DownloadLogs(act)], }; drop(engine_borrow); init_logging(global); } LoggingEngine::Initialising { ref mut backlog } => { backlog.push(QueuedAction::DownloadLogs(act)); } LoggingEngine::Unavailable => { // Assume the user has already been informed it failed, so do nothing. } LoggingEngine::Ready { .. } => { drop(engine_borrow); immediate_download_logs(global, act); } } }