minicrossterm/examples/program_examples/logging.rs

152 lines
3.8 KiB
Rust
Raw Normal View History

extern crate crossterm;
use crossterm::Screen;
use std::sync::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::io::Write;
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{JoinHandle, self};
/// This is an que that could be shared between threads safely.
#[derive(Clone)]
struct WorkQueue<T: Send + Clone> {
inner: Arc<Mutex<VecDeque<T>>>,
}
impl<T: Send + Clone> WorkQueue<T> {
fn new() -> Self {
Self { inner: Arc::new(Mutex::new(VecDeque::new())) }
}
// get an item from the que if exists
fn get_work(&self) -> Option<T> {
let maybe_queue = self.inner.lock();
if let Ok(mut queue) = maybe_queue {
queue.pop_front()
} else {
panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
}
}
// add an item to the que
fn add_work(&self, work: T) -> usize {
if let Ok(mut queue) = self.inner.lock() {
queue.push_back(work);
queue.len()
} else {
panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
}
}
}
#[derive(Clone)]
struct SyncFlagTx {
inner: Arc<Mutex<bool>>,
}
impl SyncFlagTx {
pub fn set(&mut self, state: bool) -> Result<(), ()> {
if let Ok(mut v) = self.inner.lock() {
*v = state;
Ok(())
} else {
Err(())
}
}
}
#[derive(Clone)]
struct SyncFlagRx {
inner: Arc<Mutex<bool>>,
}
impl SyncFlagRx {
pub fn get(&self) -> Result<bool, ()> {
if let Ok(v) = self.inner.lock() {
Ok(*v)
} else {
Err(())
}
}
}
fn new_sync_flag(initial_state: bool) -> (SyncFlagTx, SyncFlagRx) {
let state = Arc::new(Mutex::new(initial_state));
let tx = SyncFlagTx { inner: state.clone() };
let rx = SyncFlagRx { inner: state.clone() };
return (tx, rx);
}
fn main()
{
let (results_tx, results_rx): (Sender<String>, Receiver<String>) = mpsc::channel();
let (mut more_jobs_tx, more_jobs_rx) = new_sync_flag(true);
// queue with all log entry's.
let queue = WorkQueue::new();
// queue x logs with different threads.
let thread_handles = log_with_different_threads(more_jobs_tx.clone(), queue.clone());
// a thread that will log all logs in the queue.
handle_incoming_logs(more_jobs_rx.clone(), queue.clone());
for handle in thread_handles
{
handle.join();
}
}
fn handle_incoming_logs(more_jobs_rx: SyncFlagRx, queue: WorkQueue<String>)
{
thread::spawn( move || {
let mut screen: Screen = Screen::new();
// Loop while there's expected to be work, looking for work.
while more_jobs_rx.get().unwrap() {
// If work is available, do that work.
if let Some(work) = queue.get_work() {
2018-08-12 02:08:26 +10:00
let mut log = work;
log.push('\n');
// write the log
2018-08-12 02:08:26 +10:00
screen.stdout.write_string(log);
screen.stdout.flush();
}
std::thread::yield_now();
}
}).join();
}
// start different threads that log contiguously.
fn log_with_different_threads(more_jobs_tx: SyncFlagTx, queue: WorkQueue<String>) -> Vec<JoinHandle<()>>
{
// one vector that will have the thread handles in it.
let mut threads = Vec::new();
for thread_num in 1..5
{
let mut more_jobs = more_jobs_tx.clone();
let thread_queue = queue.clone();
// create new thread
let thread = thread::spawn(move || {
// log 400 messages
2018-08-12 02:08:26 +10:00
for log_entry_count in 1..10000
{
thread_queue.add_work(format!("Log {} from thread {} ",log_entry_count, thread_num));
more_jobs.set(true);
}
});
threads.push(thread);
}
println!("All logging threads started");
return threads;
}