diff --git a/src/event/stream.rs b/src/event/stream.rs index 38faae1..6e51c6e 100644 --- a/src/event/stream.rs +++ b/src/event/stream.rs @@ -2,6 +2,7 @@ use std::{ pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::{self, SyncSender}, Arc, }, thread, @@ -34,16 +35,37 @@ use super::{ #[derive(Debug)] pub struct EventStream { poll_internal_waker: Waker, - stream_wake_thread_spawned: Arc, - stream_wake_thread_should_shutdown: Arc, + stream_wake_task_executed: Arc, + stream_wake_task_should_shutdown: Arc, + task_sender: SyncSender, } impl Default for EventStream { fn default() -> Self { + let (task_sender, receiver) = mpsc::sync_channel::(1); + + thread::spawn(move || { + while let Ok(task) = receiver.recv() { + loop { + if let Ok(true) = poll_internal(None, &EventFilter) { + break; + } + + if task.stream_wake_task_should_shutdown.load(Ordering::SeqCst) { + break; + } + } + task.stream_wake_task_executed + .store(false, Ordering::SeqCst); + task.stream_waker.wake(); + } + }); + EventStream { poll_internal_waker: INTERNAL_EVENT_READER.write().waker(), - stream_wake_thread_spawned: Arc::new(AtomicBool::new(false)), - stream_wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)), + stream_wake_task_executed: Arc::new(AtomicBool::new(false)), + stream_wake_task_should_shutdown: Arc::new(AtomicBool::new(false)), + task_sender, } } } @@ -55,6 +77,12 @@ impl EventStream { } } +struct Task { + stream_waker: std::task::Waker, + stream_wake_task_executed: Arc, + stream_wake_task_should_shutdown: Arc, +} + // Note to future me // // We need two wakers in order to implement EventStream correctly. @@ -86,28 +114,20 @@ impl Stream for EventStream { }, Ok(false) => { if !self - .stream_wake_thread_spawned + .stream_wake_task_executed .compare_and_swap(false, true, Ordering::SeqCst) { let stream_waker = cx.waker().clone(); - let stream_wake_thread_spawned = self.stream_wake_thread_spawned.clone(); - let stream_wake_thread_should_shutdown = - self.stream_wake_thread_should_shutdown.clone(); + let stream_wake_task_executed = self.stream_wake_task_executed.clone(); + let stream_wake_task_should_shutdown = + self.stream_wake_task_should_shutdown.clone(); - stream_wake_thread_should_shutdown.store(false, Ordering::SeqCst); + stream_wake_task_should_shutdown.store(false, Ordering::SeqCst); - thread::spawn(move || { - loop { - if let Ok(true) = poll_internal(None, &EventFilter) { - break; - } - - if stream_wake_thread_should_shutdown.load(Ordering::SeqCst) { - break; - } - } - stream_wake_thread_spawned.store(false, Ordering::SeqCst); - stream_waker.wake(); + let _ = self.task_sender.send(Task { + stream_waker, + stream_wake_task_executed, + stream_wake_task_should_shutdown, }); } Poll::Pending @@ -120,7 +140,7 @@ impl Stream for EventStream { impl Drop for EventStream { fn drop(&mut self) { - self.stream_wake_thread_should_shutdown + self.stream_wake_task_should_shutdown .store(true, Ordering::SeqCst); let _ = self.poll_internal_waker.wake(); }