Use a single thread for event stream (#503)

This commit is contained in:
quininer 2020-11-03 17:39:47 +08:00 committed by GitHub
parent f85105c284
commit 4eee9187d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,6 +2,7 @@ use std::{
pin::Pin, pin::Pin,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::{self, SyncSender},
Arc, Arc,
}, },
thread, thread,
@ -34,16 +35,37 @@ use super::{
#[derive(Debug)] #[derive(Debug)]
pub struct EventStream { pub struct EventStream {
poll_internal_waker: Waker, poll_internal_waker: Waker,
stream_wake_thread_spawned: Arc<AtomicBool>, stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_thread_should_shutdown: Arc<AtomicBool>, stream_wake_task_should_shutdown: Arc<AtomicBool>,
task_sender: SyncSender<Task>,
} }
impl Default for EventStream { impl Default for EventStream {
fn default() -> Self { fn default() -> Self {
let (task_sender, receiver) = mpsc::sync_channel::<Task>(1);
thread::spawn(move || {
while let Ok(task) = receiver.recv() {
loop {
if let Ok(true) = poll_internal(None, &EventFilter) {
break;
}
if task.stream_wake_task_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
task.stream_wake_task_executed
.store(false, Ordering::SeqCst);
task.stream_waker.wake();
}
});
EventStream { EventStream {
poll_internal_waker: INTERNAL_EVENT_READER.write().waker(), poll_internal_waker: INTERNAL_EVENT_READER.write().waker(),
stream_wake_thread_spawned: Arc::new(AtomicBool::new(false)), stream_wake_task_executed: Arc::new(AtomicBool::new(false)),
stream_wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)), stream_wake_task_should_shutdown: Arc::new(AtomicBool::new(false)),
task_sender,
} }
} }
} }
@ -55,6 +77,12 @@ impl EventStream {
} }
} }
struct Task {
stream_waker: std::task::Waker,
stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_task_should_shutdown: Arc<AtomicBool>,
}
// Note to future me // Note to future me
// //
// We need two wakers in order to implement EventStream correctly. // We need two wakers in order to implement EventStream correctly.
@ -86,28 +114,20 @@ impl Stream for EventStream {
}, },
Ok(false) => { Ok(false) => {
if !self if !self
.stream_wake_thread_spawned .stream_wake_task_executed
.compare_and_swap(false, true, Ordering::SeqCst) .compare_and_swap(false, true, Ordering::SeqCst)
{ {
let stream_waker = cx.waker().clone(); let stream_waker = cx.waker().clone();
let stream_wake_thread_spawned = self.stream_wake_thread_spawned.clone(); let stream_wake_task_executed = self.stream_wake_task_executed.clone();
let stream_wake_thread_should_shutdown = let stream_wake_task_should_shutdown =
self.stream_wake_thread_should_shutdown.clone(); self.stream_wake_task_should_shutdown.clone();
stream_wake_thread_should_shutdown.store(false, Ordering::SeqCst); stream_wake_task_should_shutdown.store(false, Ordering::SeqCst);
thread::spawn(move || { let _ = self.task_sender.send(Task {
loop { stream_waker,
if let Ok(true) = poll_internal(None, &EventFilter) { stream_wake_task_executed,
break; stream_wake_task_should_shutdown,
}
if stream_wake_thread_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
stream_wake_thread_spawned.store(false, Ordering::SeqCst);
stream_waker.wake();
}); });
} }
Poll::Pending Poll::Pending
@ -120,7 +140,7 @@ impl Stream for EventStream {
impl Drop for EventStream { impl Drop for EventStream {
fn drop(&mut self) { fn drop(&mut self) {
self.stream_wake_thread_should_shutdown self.stream_wake_task_should_shutdown
.store(true, Ordering::SeqCst); .store(true, Ordering::SeqCst);
let _ = self.poll_internal_waker.wake(); let _ = self.poll_internal_waker.wake();
} }