worldwideportal/src/logging.rs

545 lines
16 KiB
Rust

use std::{
collections::BTreeMap,
mem::replace,
ops::{Deref, DerefMut},
sync::Arc,
};
use indexed_db_futures::{
idb_object_store::IdbObjectStoreParameters,
request::{IdbOpenDbRequestLike, OpenDbRequest},
IdbDatabase, IdbKeyPath, IdbQuerySource, IdbVersionChangeEvent,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen_futures::{
js_sys::{Array, Uint8Array},
spawn_local,
};
use web_sys::{
console, js_sys::Date, Blob, BlobPropertyBag, DomException, HtmlAnchorElement, IdbKeyRange,
IdbTransactionMode, Url,
};
use crate::{echo_to_term_frame, GlobalMemoCell, TermFrame};
#[derive(Clone, Debug)]
pub enum QueuedAction {
LogEvent(LogEvent),
ReportOnLogStreams(TermFrame),
DeleteLogs(DeleteLogs),
DownloadLogs(DownloadLogs),
}
#[derive(Clone, Debug)]
pub struct DeleteLogs {
pub reply_to: TermFrame,
pub stream: String,
pub min_date: String,
pub max_date: String,
}
#[derive(Clone, Debug)]
pub struct DownloadLogs {
pub reply_to: TermFrame,
pub stream: String,
pub min_date: String,
pub max_date: String,
}
#[derive(Clone, Debug)]
pub struct LogEvent {
pub log_stream: String,
pub timestamp: Date,
pub message: String,
}
#[derive(Serialize, Deserialize)]
pub struct StoredLogEvent {
pub stream: String,
pub timestamp: String,
pub message: String,
}
impl From<&LogEvent> for StoredLogEvent {
fn from(value: &LogEvent) -> Self {
Self {
stream: value.log_stream.clone(),
timestamp: value
.timestamp
.to_iso_string()
.as_string()
.expect("timestamp to_iso_string wasn't string"),
message: value.message.clone(),
}
}
}
pub enum LoggingEngine {
Uninitialised,
Initialising { backlog: Vec<QueuedAction> },
Unavailable,
Ready { db: Arc<IdbDatabase> },
}
impl Default for LoggingEngine {
fn default() -> Self {
Self::Uninitialised
}
}
async fn async_init_logging() -> Result<LoggingEngine, DomException> {
let mut db_req: OpenDbRequest = IdbDatabase::open_u32("logging", 1)?;
db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> {
if evt
.db()
.object_store_names()
.find(|n| n == "logs")
.is_none()
{
let store = evt.db().create_object_store_with_params(
"logs",
<IdbObjectStoreParameters as Default>::default().auto_increment(true),
)?;
store.create_index(
"by_stream_timestamp",
&IdbKeyPath::str_sequence(&["stream", "timestamp"]),
)?;
}
Ok(())
}));
let db: IdbDatabase = db_req.await?;
Ok(LoggingEngine::Ready { db: db.into() })
}
fn logging_broken_message(global: &GlobalMemoCell, frame: &TermFrame) {
let _ = echo_to_term_frame(global, frame, "Couldn't enable logging. This sometimes happens if your browser doesn't enable indexed storage, or is in a mode (e.g. private browsing) where such storage is not permitted.\r\n");
}
fn init_logging(global: &GlobalMemoCell) {
let global: GlobalMemoCell = global.clone();
spawn_local(async move {
match async_init_logging().await {
Ok(new_engine) => {
let old_engine = replace(global.log_engine.borrow_mut().deref_mut(), new_engine);
if let LoggingEngine::Initialising { backlog } = old_engine {
for action in backlog.into_iter() {
match action {
QueuedAction::LogEvent(ev) => queue_immediate_log(&global, ev),
QueuedAction::ReportOnLogStreams(frame) => {
immediate_report_log_frame(&global, frame);
}
QueuedAction::DeleteLogs(act) => {
immediate_delete_logs(&global, act);
}
QueuedAction::DownloadLogs(act) => {
immediate_download_logs(&global, act);
}
}
}
}
}
Err(_) => {
if let LoggingEngine::Initialising { .. } = global.log_engine.borrow().deref() {
logging_broken_message(&global, &TermFrame(1));
}
}
}
})
}
fn queue_immediate_log_refutable(db: &IdbDatabase, event: &LogEvent) -> Result<(), String> {
let trans = db
.transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite)
.map_err(|e| e.message())?;
let store = trans.object_store("logs").map_err(|e| e.message())?;
store
.put_val(
&serde_wasm_bindgen::to_value::<StoredLogEvent>(&event.into())
.map_err(|_| "Can't serialise event")?,
)
.map_err(|e| e.message())?;
Ok(())
}
fn queue_immediate_log(global: &GlobalMemoCell, event: LogEvent) {
if let LoggingEngine::Ready { db } = global.log_engine.borrow().deref() {
match queue_immediate_log_refutable(db, &event) {
Ok(()) => {}
Err(e) => {
console::log_2(
&JsValue::from_str("Error writing event to IndexedDb"),
&JsValue::from(e),
);
}
}
}
}
async fn immediate_report_log_frame_refutable(
global: GlobalMemoCell,
db: &IdbDatabase,
frame: TermFrame,
) -> Result<(), DomException> {
let trans = db.transaction_on_one("logs")?;
let store = trans.object_store("logs")?;
let curs = store.open_cursor()?.await?;
let curs = match curs {
None => {
let _ = echo_to_term_frame(&global, &frame, "No logs are currently stored.\r\n");
return Ok(());
}
Some(curs) => curs,
};
let mut event_stats = BTreeMap::<String, u64>::new();
loop {
if let Ok(ev) = serde_wasm_bindgen::from_value::<StoredLogEvent>(curs.value()) {
event_stats
.entry(ev.stream)
.and_modify(|v| *v += 1)
.or_insert(1);
};
if !curs.continue_cursor()?.await? {
break;
}
}
let msg = event_stats
.iter()
.map(|(k, v)| format!("Log stream {} has {} log records.\r\n", k, v))
.join("");
let _ = echo_to_term_frame(&global, &frame, &msg);
Ok(())
}
async fn immediate_report_log_frame_async(
global: GlobalMemoCell,
db: Arc<IdbDatabase>,
frame: TermFrame,
) {
match immediate_report_log_frame_refutable(global, &db, frame).await {
Ok(()) => {}
Err(e) => {
console::log_2(
&JsValue::from_str("Error reading log stats from IndexedDb"),
&JsValue::from(e),
);
}
}
}
fn immediate_report_log_frame(global: &GlobalMemoCell, frame: TermFrame) {
let log_engine = global.log_engine.borrow();
if let LoggingEngine::Ready { db } = log_engine.deref() {
spawn_local(immediate_report_log_frame_async(
global.clone(),
db.clone(),
frame,
));
}
}
async fn immediate_delete_logs_refutable(
global: GlobalMemoCell,
db: &IdbDatabase,
act: DeleteLogs,
) -> Result<(), DomException> {
let trans = db.transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite)?;
let store = trans.object_store("logs")?;
let idx = store.index("by_stream_timestamp")?;
let range = IdbKeyRange::bound(
&JsValue::from(
[
JsValue::from_str(&act.stream),
JsValue::from_str(&act.min_date),
]
.iter()
.collect::<Array>(),
),
&JsValue::from(
[
JsValue::from_str(&act.stream),
JsValue::from_str(&act.max_date),
]
.iter()
.collect::<Array>(),
),
)?;
let cur = match idx.open_key_cursor_with_range(&range)?.await? {
None => {
let _ = echo_to_term_frame(
&global,
&act.reply_to,
"No logs matched; no logs deleted.\r\n",
);
return Ok(());
}
Some(cur) => cur,
};
let mut records: u64 = 0;
loop {
if let Some(pk) = cur.primary_key() {
store.delete(&pk)?;
records += 1;
}
if !cur.continue_cursor()?.await? {
break;
}
}
let _ = echo_to_term_frame(
&global,
&act.reply_to,
&format!("{} logs matched and were deleted.\r\n", records),
);
Ok(())
}
async fn immediate_delete_logs_async(
global: GlobalMemoCell,
db: Arc<IdbDatabase>,
act: DeleteLogs,
) {
match immediate_delete_logs_refutable(global, &db, act).await {
Ok(()) => {}
Err(e) => {
console::log_2(
&JsValue::from_str("Error deleting logs from IndexedDb"),
&JsValue::from(e),
);
}
}
}
fn immediate_delete_logs(global: &GlobalMemoCell, act: DeleteLogs) {
let log_engine = global.log_engine.borrow();
if let LoggingEngine::Ready { db } = log_engine.deref() {
spawn_local(immediate_delete_logs_async(
global.clone(),
db.clone(),
act.clone(),
));
}
}
async fn immediate_download_logs_refutable(
global: GlobalMemoCell,
db: &IdbDatabase,
act: DownloadLogs,
) -> Result<(), DomException> {
let trans = db.transaction_on_one_with_mode("logs", IdbTransactionMode::Readwrite)?;
let store = trans.object_store("logs")?;
let idx = store.index("by_stream_timestamp")?;
let range = IdbKeyRange::bound(
&JsValue::from(
[
JsValue::from_str(&act.stream),
JsValue::from_str(&act.min_date),
]
.iter()
.collect::<Array>(),
),
&JsValue::from(
[
JsValue::from_str(&act.stream),
JsValue::from_str(&act.max_date),
]
.iter()
.collect::<Array>(),
),
)?;
let cur = match idx.open_key_cursor_with_range(&range)?.await? {
None => {
let _ = echo_to_term_frame(
&global,
&act.reply_to,
"No logs matched; no logs downloaded.\r\n",
);
return Ok(());
}
Some(cur) => cur,
};
let mut buf: String = String::new();
loop {
if let Some(k) = cur.primary_key() {
if let Some(Ok(rec)) = store
.get(&k)?
.await?
.map(serde_wasm_bindgen::from_value::<StoredLogEvent>)
{
buf.push_str(&rec.message);
}
}
if !cur.continue_cursor()?.await? {
break;
}
}
let buf_bytes = buf.as_bytes();
let buf_array = Uint8Array::new_with_length(buf_bytes.len() as u32);
buf_array.copy_from(buf_bytes);
let mut blobprops: BlobPropertyBag = Default::default();
blobprops.type_("text/plain");
let blob = Blob::new_with_u8_array_sequence_and_options(
&[&buf_array].into_iter().collect::<Array>(),
&blobprops,
)?;
console::log_2(&buf_array, &blob);
let url = Url::create_object_url_with_blob(&blob)?;
if let Some((doc, body)) = web_sys::window()
.and_then(|w| w.document())
.and_then(|d| d.body().map(|b| (d, b)))
{
let el_anchor = JsCast::unchecked_into::<HtmlAnchorElement>(doc.create_element("a")?);
body.append_child(&el_anchor)?;
el_anchor.set_href(&url);
el_anchor.set_download(&format!("{}-logs.txt", act.stream));
el_anchor.click();
body.remove_child(&el_anchor)?;
}
Url::revoke_object_url(&url)?;
Ok(())
}
async fn immediate_download_logs_async(
global: GlobalMemoCell,
db: Arc<IdbDatabase>,
act: DownloadLogs,
) {
match immediate_download_logs_refutable(global, &db, act).await {
Ok(()) => {}
Err(e) => {
console::log_2(
&JsValue::from_str("Error downloading logs from IndexedDb"),
&JsValue::from(e),
);
}
}
}
fn immediate_download_logs(global: &GlobalMemoCell, act: DownloadLogs) {
let log_engine = global.log_engine.borrow();
if let LoggingEngine::Ready { db } = log_engine.deref() {
spawn_local(immediate_download_logs_async(
global.clone(),
db.clone(),
act.clone(),
));
}
}
pub fn log(global: &GlobalMemoCell, event: &LogEvent) {
let mut engine_borrow = global.log_engine.borrow_mut();
match engine_borrow.deref_mut() {
LoggingEngine::Uninitialised => {
*engine_borrow = LoggingEngine::Initialising {
backlog: vec![QueuedAction::LogEvent(event.clone())],
};
drop(engine_borrow);
init_logging(global);
}
LoggingEngine::Initialising { ref mut backlog } => {
backlog.push(QueuedAction::LogEvent(event.clone()));
}
LoggingEngine::Unavailable => {
// Assume the user has already been informed it failed, so do nothing.
}
LoggingEngine::Ready { .. } => {
drop(engine_borrow);
queue_immediate_log(global, event.clone());
}
}
}
pub fn start_listing_logs(global: &GlobalMemoCell, send_to: &TermFrame) {
let mut engine_borrow = global.log_engine.borrow_mut();
match engine_borrow.deref_mut() {
LoggingEngine::Uninitialised => {
*engine_borrow = LoggingEngine::Initialising {
backlog: vec![QueuedAction::ReportOnLogStreams(send_to.clone())],
};
drop(engine_borrow);
init_logging(global);
}
LoggingEngine::Initialising { ref mut backlog } => {
backlog.push(QueuedAction::ReportOnLogStreams(send_to.clone()));
}
LoggingEngine::Unavailable => {
// Assume the user has already been informed it failed, so do nothing.
}
LoggingEngine::Ready { .. } => {
drop(engine_borrow);
immediate_report_log_frame(global, send_to.clone());
}
}
}
pub fn start_deleting_logs(
global: &GlobalMemoCell,
reply_to: &TermFrame,
stream: &str,
min_date: &str,
max_date: &str,
) {
let mut engine_borrow = global.log_engine.borrow_mut();
let act = DeleteLogs {
reply_to: reply_to.clone(),
stream: stream.to_owned(),
min_date: min_date.to_owned(),
max_date: max_date.to_owned(),
};
match engine_borrow.deref_mut() {
LoggingEngine::Uninitialised => {
*engine_borrow = LoggingEngine::Initialising {
backlog: vec![QueuedAction::DeleteLogs(act)],
};
drop(engine_borrow);
init_logging(global);
}
LoggingEngine::Initialising { ref mut backlog } => {
backlog.push(QueuedAction::DeleteLogs(act));
}
LoggingEngine::Unavailable => {
// Assume the user has already been informed it failed, so do nothing.
}
LoggingEngine::Ready { .. } => {
drop(engine_borrow);
immediate_delete_logs(global, act);
}
}
}
pub fn start_downloading_logs(
global: &GlobalMemoCell,
reply_to: &TermFrame,
stream: &str,
min_date: &str,
max_date: &str,
) {
let mut engine_borrow = global.log_engine.borrow_mut();
let act = DownloadLogs {
reply_to: reply_to.clone(),
stream: stream.to_owned(),
min_date: min_date.to_owned(),
max_date: max_date.to_owned(),
};
match engine_borrow.deref_mut() {
LoggingEngine::Uninitialised => {
*engine_borrow = LoggingEngine::Initialising {
backlog: vec![QueuedAction::DownloadLogs(act)],
};
drop(engine_borrow);
init_logging(global);
}
LoggingEngine::Initialising { ref mut backlog } => {
backlog.push(QueuedAction::DownloadLogs(act));
}
LoggingEngine::Unavailable => {
// Assume the user has already been informed it failed, so do nothing.
}
LoggingEngine::Ready { .. } => {
drop(engine_borrow);
immediate_download_logs(global, act);
}
}
}