Implemented poll Waker.

This commit is contained in:
Robert Vojta 2019-12-10 20:41:41 +01:00 committed by Timon
parent b4241e0107
commit 8fb9059853
13 changed files with 303 additions and 145 deletions

View File

@ -47,7 +47,7 @@ version = "0.3.8"
features = ["winuser"]
[target.'cfg(windows)'.dependencies]
crossterm_winapi = "0.5.0"
crossterm_winapi = "0.5.1"
#
# UNIX dependencies

View File

@ -49,7 +49,7 @@ fn print_events() -> Result<()> {
fn main() -> Result<()> {
println!("{}", HELP);
enable_raw_mode();
enable_raw_mode()?;
let mut stdout = stdout();
execute!(stdout, EnableMouseCapture)?;

View File

@ -1,9 +1,13 @@
use std::{collections::vec_deque::VecDeque, time::Duration};
use std::{collections::vec_deque::VecDeque, io, time::Duration};
use crate::ErrorKind;
#[cfg(unix)]
use super::source::unix::UnixInternalEventSource;
#[cfg(windows)]
use super::source::windows::WindowsEventSource;
#[cfg(feature = "event-stream")]
use super::sys::Waker;
use super::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent, Result};
/// Can be used to read `InternalEvent`s.
@ -31,11 +35,10 @@ impl Default for InternalEventReader {
}
impl InternalEventReader {
/// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
#[cfg(feature = "event-stream")]
pub(crate) fn wake(&self) {
if let Some(source) = self.source.as_ref() {
source.wake();
}
pub(crate) fn waker(&self) -> Waker {
self.source.as_ref().expect("reader source not set").waker()
}
pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> Result<bool>
@ -62,9 +65,9 @@ impl InternalEventReader {
let poll_timeout = PollTimeout::new(timeout);
loop {
let maybe_event = match event_source.try_read(timeout)? {
None => None,
Some(event) => {
let maybe_event = match event_source.try_read(timeout) {
Ok(None) => None,
Ok(Some(event)) => {
if filter.eval(&event) {
Some(event)
} else {
@ -72,6 +75,14 @@ impl InternalEventReader {
None
}
}
Err(ErrorKind::IoError(e)) => {
if e.kind() == io::ErrorKind::Interrupted {
return Ok(false);
}
return Err(ErrorKind::IoError(e));
}
Err(e) => return Err(e),
};
if poll_timeout.elapsed() || maybe_event.is_some() {
@ -442,6 +453,9 @@ mod tests {
Ok(None)
}
fn wake(&self) {}
#[cfg(feature = "event-stream")]
fn waker(&self) -> super::super::sys::Waker {
unimplemented!();
}
}
}

View File

@ -1,25 +1,27 @@
use std::time::Duration;
#[cfg(feature = "event-stream")]
use super::sys::Waker;
use super::InternalEvent;
#[cfg(unix)]
pub mod unix;
pub(crate) mod unix;
#[cfg(windows)]
pub mod windows;
pub(crate) mod windows;
/// An interface for trying to read an `InternalEvent` within an optional `Duration`.
pub(crate) trait EventSource: Sync + Send {
/// Tries to read an `InternalEvent` within the given duration.
///
/// This function takes in an optional duration.
/// * `None`: blocks indefinitely until an event is able to be read.
/// * `Some(duration)`: blocks for the given duration.
/// # Arguments
///
/// Returns:
/// `Ok(Some(event))`: in case an event is ready.
/// `Ok(None)`: in case an event is not ready.
/// * `timeout` - `None` block indefinitely until an event is available, `Some(duration)` blocks
/// for the given timeout
///
/// Returns `Ok(None)` if there's no event available and timeout expires.
fn try_read(&mut self, timeout: Option<Duration>) -> crate::Result<Option<InternalEvent>>;
/// Forces the `try_read` method to return `Ok(None)` immediately.
fn wake(&self);
/// Returns a `Waker` allowing to wake/force the `try_read` method to return `Ok(None)`.
#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker;
}

View File

@ -1,11 +1,11 @@
use std::collections::VecDeque;
use std::{io, time::Duration};
use mio::{unix::EventedFd, Events, Poll, PollOpt, Ready, Token};
use signal_hook::iterator::Signals;
use std::{collections::VecDeque, time::Duration};
use crate::Result;
#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::unix::{parse_event, tty_fd, FileDesc},
@ -16,6 +16,7 @@ use super::super::{
// Tokens to identify file descriptor
const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1);
#[cfg(feature = "event-stream")]
const WAKE_TOKEN: Token = Token(2);
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
@ -23,22 +24,6 @@ const WAKE_TOKEN: Token = Token(2);
// is enough.
const TTY_BUFFER_SIZE: usize = 1_204;
/// Creates a new pipe and returns `(read, write)` file descriptors.
fn pipe() -> Result<(FileDesc, FileDesc)> {
let (read_fd, write_fd) = unsafe {
let mut pipe_fds: [libc::c_int; 2] = [0; 2];
if libc::pipe(pipe_fds.as_mut_ptr()) == -1 {
return Err(io::Error::last_os_error().into());
}
(pipe_fds[0], pipe_fds[1])
};
let read_fd = FileDesc::new(read_fd, true);
let write_fd = FileDesc::new(write_fd, true);
Ok((read_fd, write_fd))
}
pub(crate) struct UnixInternalEventSource {
poll: Poll,
events: Events,
@ -46,8 +31,8 @@ pub(crate) struct UnixInternalEventSource {
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc,
signals: Signals,
wake_read_fd: FileDesc,
wake_write_fd: FileDesc,
#[cfg(feature = "event-stream")]
waker: Waker,
}
impl UnixInternalEventSource {
@ -81,15 +66,10 @@ impl UnixInternalEventSource {
let signals = Signals::new(&[signal_hook::SIGWINCH])?;
poll.register(&signals, SIGNAL_TOKEN, Ready::readable(), PollOpt::level())?;
let (wake_read_fd, wake_write_fd) = pipe()?;
let wake_read_raw_fd = wake_read_fd.raw_fd();
let wake_read_ev = EventedFd(&wake_read_raw_fd);
poll.register(
&wake_read_ev,
WAKE_TOKEN,
Ready::readable(),
PollOpt::level(),
)?;
#[cfg(feature = "event-stream")]
let waker = Waker::new()?;
#[cfg(feature = "event-stream")]
poll.register(&waker, WAKE_TOKEN, Ready::readable(), PollOpt::level())?;
Ok(UnixInternalEventSource {
poll,
@ -98,8 +78,8 @@ impl UnixInternalEventSource {
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd,
signals,
wake_read_fd,
wake_write_fd,
#[cfg(feature = "event-stream")]
waker,
})
}
}
@ -164,13 +144,14 @@ impl EventSource for UnixInternalEventSource {
};
}
}
#[cfg(feature = "event-stream")]
WAKE_TOKEN => {
// Something happened on the self pipe. Try to read single byte
// (see wake() fn) and ignore result. If we can't read the byte,
// mio Poll::poll will fire another event with WAKE_TOKEN.
let mut buf = [0u8; 1];
let _ = self.wake_read_fd.read(&mut buf, 1);
return Ok(None);
let _ = self.waker.reset();
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
)
.into());
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
}
@ -183,12 +164,9 @@ impl EventSource for UnixInternalEventSource {
}
}
fn wake(&self) {
// DO NOT write more than 1 byte. See try_read & WAKE_TOKEN
// handling - it reads just 1 byte. If you write more than
// 1 byte, lets say N, then the try_read will be woken up
// N times.
let _ = self.wake_write_fd.write(&[0x57]);
#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.waker.clone()
}
}

View File

@ -4,6 +4,8 @@ use crossterm_winapi::{Console, Handle, InputEventType, KeyEventRecord, MouseEve
use crate::event::{sys::windows::WinApiPoll, Event};
#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::windows::{handle_key_event, handle_mouse_event},
@ -64,7 +66,8 @@ impl EventSource for WindowsEventSource {
}
}
fn wake(&self) {
let _ = self.poll.cancel();
#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.poll.waker()
}
}

View File

@ -16,7 +16,8 @@ use futures::{
use crate::Result;
use super::{
filter::EventFilter, poll_internal, read_internal, Event, InternalEvent, INTERNAL_EVENT_READER,
filter::EventFilter, poll_internal, read_internal, sys::Waker, Event, InternalEvent,
INTERNAL_EVENT_READER,
};
/// A stream of `Result<Event>`.
@ -30,22 +31,47 @@ use super::{
///
/// Check the [examples](https://github.com/crossterm-rs/crossterm/tree/master/examples) folder to see how to use
/// it (`event-stream-*`).
#[derive(Default)]
pub struct EventStream {
wake_thread_spawned: Arc<AtomicBool>,
wake_thread_should_shutdown: Arc<AtomicBool>,
poll_internal_waker: Waker,
stream_wake_thread_spawned: Arc<AtomicBool>,
stream_wake_thread_should_shutdown: Arc<AtomicBool>,
}
impl Default for EventStream {
fn default() -> Self {
EventStream {
poll_internal_waker: INTERNAL_EVENT_READER.write().waker(),
stream_wake_thread_spawned: Arc::new(AtomicBool::new(false)),
stream_wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)),
}
}
}
impl EventStream {
/// Constructs a new instance of `EventStream`.
pub fn new() -> EventStream {
EventStream {
wake_thread_spawned: Arc::new(AtomicBool::new(false)),
wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)),
}
EventStream::default()
}
}
// Note to future me
//
// We need two wakers in order to implement EventStream correctly.
//
// 1. futures::Stream waker
//
// Stream::poll_next can return Poll::Pending which means that there's no
// event available. We are going to spawn a thread with the
// poll_internal(None, &EventFilter) call. This call blocks until an
// event is available and then we have to wake up the executor with notification
// that the task can be resumed.
//
// 2. poll_internal waker
//
// There's no event available, Poll::Pending was returned, stream waker thread
// is up and sitting in the poll_internal. User wants to drop the EventStream.
// We have to wake up the poll_internal (force it to return Ok(false)) and quit
// the thread before we drop.
impl Stream for EventStream {
type Item = Result<Event>;
@ -59,14 +85,15 @@ impl Stream for EventStream {
},
Ok(false) => {
if !self
.wake_thread_spawned
.stream_wake_thread_spawned
.compare_and_swap(false, true, Ordering::SeqCst)
{
let waker = cx.waker().clone();
let wake_thread_spawned = self.wake_thread_spawned.clone();
let wake_thread_should_shutdown = self.wake_thread_should_shutdown.clone();
let stream_waker = cx.waker().clone();
let stream_wake_thread_spawned = self.stream_wake_thread_spawned.clone();
let stream_wake_thread_should_shutdown =
self.stream_wake_thread_should_shutdown.clone();
wake_thread_should_shutdown.store(false, Ordering::SeqCst);
stream_wake_thread_should_shutdown.store(false, Ordering::SeqCst);
thread::spawn(move || {
loop {
@ -74,12 +101,12 @@ impl Stream for EventStream {
break;
}
if wake_thread_should_shutdown.load(Ordering::SeqCst) {
if stream_wake_thread_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
wake_thread_spawned.store(false, Ordering::SeqCst);
waker.wake();
stream_wake_thread_spawned.store(false, Ordering::SeqCst);
stream_waker.wake();
});
}
Poll::Pending
@ -92,8 +119,8 @@ impl Stream for EventStream {
impl Drop for EventStream {
fn drop(&mut self) {
self.wake_thread_should_shutdown
self.stream_wake_thread_should_shutdown
.store(true, Ordering::SeqCst);
INTERNAL_EVENT_READER.read().wake();
let _ = self.poll_internal_waker.wake();
}
}

View File

@ -1,4 +1,9 @@
#[cfg(all(unix, feature = "event-stream"))]
pub(crate) use unix::Waker;
#[cfg(all(windows, feature = "event-stream"))]
pub(crate) use windows::Waker;
#[cfg(unix)]
pub mod unix;
pub(crate) mod unix;
#[cfg(windows)]
pub mod windows;
pub(crate) mod windows;

View File

@ -3,7 +3,10 @@ use std::{
os::unix::io::{IntoRawFd, RawFd},
};
use libc::{c_int, c_void, size_t, ssize_t};
use libc::size_t;
#[cfg(feature = "event-stream")]
pub(crate) use waker::Waker;
use crate::{
event::{Event, KeyCode, KeyEvent, KeyModifiers, MouseButton, MouseEvent},
@ -12,22 +15,8 @@ use crate::{
use super::super::InternalEvent;
// libstd::sys::unix::fd.rs
fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
// with the man page quoting that if the count of bytes to read is
// greater than `SSIZE_MAX` the result is "unspecified".
//
// On macOS, however, apparently the 64-bit libc is either buggy or
// intentionally showing odd behavior by rejecting any read with a size
// larger than or equal to INT_MAX. To handle both of these the read
// size is capped on both platforms.
if cfg!(target_os = "macos") {
<c_int>::max_value() as usize - 1
} else {
<ssize_t>::max_value() as usize
}
}
#[cfg(feature = "event-stream")]
mod waker;
/// A file descriptor wrapper.
///
@ -65,24 +54,6 @@ impl FileDesc {
}
}
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
// libstd::sys::unix::fd.rs
let ret = unsafe {
libc::write(
self.fd,
buf.as_ptr() as *const c_void,
std::cmp::min(buf.len(), max_len()) as size_t,
) as c_int
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok(ret as usize)
}
/// Returns the underlying file descriptor.
pub fn raw_fd(&self) -> RawFd {
self.fd

100
src/event/sys/unix/waker.rs Normal file
View File

@ -0,0 +1,100 @@
// TODO Replace with `mio::Waker` when the 0.7 is released (not available in 0.6).
use std::sync::{Arc, Mutex};
use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
use crate::Result;
struct WakerInner {
registration: Registration,
set_readiness: SetReadiness,
}
impl WakerInner {
fn new() -> Self {
let (registration, set_readiness) = Registration::new2();
Self {
registration,
set_readiness,
}
}
fn wake(&self) -> Result<()> {
self.set_readiness.set_readiness(Ready::readable())?;
Ok(())
}
fn reset(&self) -> Result<()> {
self.set_readiness.set_readiness(Ready::empty())?;
Ok(())
}
}
/// Allows to wake up the `mio::Poll::poll()` method.
#[derive(Clone)]
pub(crate) struct Waker {
inner: Arc<Mutex<WakerInner>>,
}
impl Waker {
/// Creates a new waker.
///
/// `Waker` implements the `mio::Evented` trait and you have to register
/// it in order to use it.
pub(crate) fn new() -> Result<Self> {
Ok(Self {
inner: Arc::new(Mutex::new(WakerInner::new())),
})
}
/// Wakes the `mio::Poll.poll()` method.
///
/// Readiness is set to `Ready::readable()`.
pub(crate) fn wake(&self) -> Result<()> {
self.inner.lock().unwrap().wake()
}
/// Resets the state so the same waker can be reused.
///
/// Readiness is set back to `Ready::empty()`.
pub(crate) fn reset(&self) -> Result<()> {
self.inner.lock().unwrap().reset()
}
}
impl Evented for Waker {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> ::std::io::Result<()> {
self.inner
.lock()
.unwrap()
.registration
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> ::std::io::Result<()> {
self.inner
.lock()
.unwrap()
.registration
.reregister(poll, token, interest, opts)
}
#[allow(deprecated)]
fn deregister(&self, poll: &Poll) -> ::std::io::Result<()> {
self.inner.lock().unwrap().registration.deregister(poll)
}
}

View File

@ -1,10 +1,9 @@
//! This is a WINDOWS specific implementation for input related action.
use std::{io, io::ErrorKind, sync::Mutex, time::Duration};
use std::{io, sync::Mutex, time::Duration};
use crossterm_winapi::{
ConsoleMode, ControlKeyState, EventFlags, Handle, KeyEventRecord, MouseEvent, ScreenBuffer,
Semaphore,
};
use winapi::{
shared::winerror::WAIT_TIMEOUT,
@ -23,12 +22,17 @@ use winapi::{
};
use lazy_static::lazy_static;
#[cfg(feature = "event-stream")]
pub(crate) use waker::Waker;
use crate::{
event::{Event, KeyCode, KeyEvent, KeyModifiers, MouseButton},
Result,
};
#[cfg(feature = "event-stream")]
mod waker;
const ENABLE_MOUSE_MODE: u32 = 0x0010 | 0x0080 | 0x0008;
lazy_static! {
@ -241,12 +245,21 @@ fn parse_mouse_event_record(event: &MouseEvent) -> Result<Option<crate::event::M
}
pub(crate) struct WinApiPoll {
semaphore: Option<Semaphore>,
#[cfg(feature = "event-stream")]
waker: Waker,
}
impl WinApiPoll {
#[cfg(not(feature = "event-stream"))]
pub(crate) fn new() -> Result<WinApiPoll> {
Ok(WinApiPoll { semaphore: None })
Ok(WinApiPoll {})
}
#[cfg(feature = "event-stream")]
pub(crate) fn new() -> Result<WinApiPoll> {
Ok(WinApiPoll {
waker: Waker::new()?,
})
}
}
@ -258,46 +271,48 @@ impl WinApiPoll {
INFINITE
};
let semaphore = Semaphore::new()?;
let console_handle = Handle::current_in_handle()?;
let handles = &[*console_handle, semaphore.handle()];
self.semaphore = Some(semaphore);
#[cfg(feature = "event-stream")]
let semaphore = self.waker.semaphore();
#[cfg(feature = "event-stream")]
let handles = &[*console_handle, **semaphore.handle()];
#[cfg(not(feature = "event-stream"))]
let handles = &[*console_handle];
let output =
unsafe { WaitForMultipleObjects(handles.len() as u32, handles.as_ptr(), 0, dw_millis) };
let result = match output {
match output {
output if output == WAIT_OBJECT_0 => {
// input handle triggered
Ok(Some(true))
}
#[cfg(feature = "event-stream")]
output if output == WAIT_OBJECT_0 + 1 => {
// semaphore handle triggered
Ok(None)
let _ = self.waker.reset();
Err(io::Error::new(
io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
)
.into())
}
WAIT_TIMEOUT | WAIT_ABANDONED_0 => {
// timeout elapsed
Ok(None)
}
WAIT_FAILED => return Err(io::Error::last_os_error().into()),
WAIT_FAILED => Err(io::Error::last_os_error().into()),
_ => Err(io::Error::new(
ErrorKind::Other,
io::ErrorKind::Other,
"WaitForMultipleObjects returned unexpected result.",
)
.into()),
};
self.semaphore = None;
result
}
}
pub fn cancel(&self) -> Result<()> {
if let Some(semaphore) = &self.semaphore {
semaphore.release()?
}
Ok(())
#[cfg(feature = "event-stream")]
pub fn waker(&self) -> Waker {
self.waker.clone()
}
}

View File

@ -0,0 +1,42 @@
use std::sync::{Arc, Mutex};
use crossterm_winapi::Semaphore;
use crate::Result;
/// Allows to wake up the `WinApiPoll::poll()` method.
#[derive(Clone)]
pub(crate) struct Waker {
inner: Arc<Mutex<Semaphore>>,
}
impl Waker {
/// Creates a new waker.
///
/// `Waker` is based on the `Semaphore`. You have to use the semaphore
/// handle along with the `WaitForMultipleObjects`.
pub(crate) fn new() -> Result<Self> {
let inner = Semaphore::new()?;
Ok(Self {
inner: Arc::new(Mutex::new(inner)),
})
}
/// Wakes the `WaitForMultipleObjects`.
pub(crate) fn wake(&self) -> Result<()> {
self.inner.lock().unwrap().release()?;
Ok(())
}
/// Replaces the current semaphore with a new one allowing us to reuse the same `Waker`.
pub(crate) fn reset(&self) -> Result<()> {
*self.inner.lock().unwrap() = Semaphore::new()?;
Ok(())
}
/// Returns the semaphore associated with the waker.
pub(crate) fn semaphore(&self) -> Semaphore {
self.inner.lock().unwrap().clone()
}
}

View File

@ -1,6 +1,7 @@
use super::sys::windows::set_virtual_terminal_processing;
use lazy_static::lazy_static;
use super::sys::windows::set_virtual_terminal_processing;
lazy_static! {
static ref SUPPORTS_ANSI_ESCAPE_CODES: bool = {
// Some terminals on windows like GitBash can't use WinaApi calls directly