Replace mio polling with filedescriptor's poll() (#735)
This commit is contained in:
parent
3fe13e18d8
commit
05229b71f9
@ -29,6 +29,7 @@ all-features = true
|
||||
default = ["bracketed-paste"]
|
||||
bracketed-paste = []
|
||||
event-stream = ["futures-core"]
|
||||
use-dev-tty = ["filedescriptor"]
|
||||
|
||||
#
|
||||
# Shared dependencies
|
||||
@ -56,8 +57,9 @@ crossterm_winapi = "0.9"
|
||||
#
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = "0.2"
|
||||
mio = { version = "0.8", features = ["os-poll"] }
|
||||
signal-hook = { version = "0.3.13" }
|
||||
filedescriptor = { version = "0.8", optional = true }
|
||||
mio = { version = "0.8", features = ["os-poll"] }
|
||||
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }
|
||||
|
||||
#
|
||||
|
@ -1,241 +1,11 @@
|
||||
use std::{collections::VecDeque, io, time::Duration};
|
||||
#[cfg(feature = "use-dev-tty")]
|
||||
pub(crate) mod tty;
|
||||
|
||||
use mio::{unix::SourceFd, Events, Interest, Poll, Token};
|
||||
use signal_hook_mio::v0_8::Signals;
|
||||
#[cfg(not(feature = "use-dev-tty"))]
|
||||
pub(crate) mod mio;
|
||||
|
||||
use crate::Result;
|
||||
#[cfg(feature = "use-dev-tty")]
|
||||
pub(crate) use self::tty::UnixInternalEventSource;
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
use super::super::sys::Waker;
|
||||
use super::super::{
|
||||
source::EventSource,
|
||||
sys::unix::{
|
||||
file_descriptor::{tty_fd, FileDesc},
|
||||
parse::parse_event,
|
||||
},
|
||||
timeout::PollTimeout,
|
||||
Event, InternalEvent,
|
||||
};
|
||||
|
||||
// Tokens to identify file descriptor
|
||||
const TTY_TOKEN: Token = Token(0);
|
||||
const SIGNAL_TOKEN: Token = Token(1);
|
||||
#[cfg(feature = "event-stream")]
|
||||
const WAKE_TOKEN: Token = Token(2);
|
||||
|
||||
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
|
||||
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
|
||||
// is enough.
|
||||
const TTY_BUFFER_SIZE: usize = 1_024;
|
||||
|
||||
pub(crate) struct UnixInternalEventSource {
|
||||
poll: Poll,
|
||||
events: Events,
|
||||
parser: Parser,
|
||||
tty_buffer: [u8; TTY_BUFFER_SIZE],
|
||||
tty_fd: FileDesc,
|
||||
signals: Signals,
|
||||
#[cfg(feature = "event-stream")]
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl UnixInternalEventSource {
|
||||
pub fn new() -> Result<Self> {
|
||||
UnixInternalEventSource::from_file_descriptor(tty_fd()?)
|
||||
}
|
||||
|
||||
pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
|
||||
let poll = Poll::new()?;
|
||||
let registry = poll.registry();
|
||||
|
||||
let tty_raw_fd = input_fd.raw_fd();
|
||||
let mut tty_ev = SourceFd(&tty_raw_fd);
|
||||
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
|
||||
|
||||
let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?;
|
||||
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
let waker = Waker::new(registry, WAKE_TOKEN)?;
|
||||
|
||||
Ok(UnixInternalEventSource {
|
||||
poll,
|
||||
events: Events::with_capacity(3),
|
||||
parser: Parser::default(),
|
||||
tty_buffer: [0u8; TTY_BUFFER_SIZE],
|
||||
tty_fd: input_fd,
|
||||
signals,
|
||||
#[cfg(feature = "event-stream")]
|
||||
waker,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl EventSource for UnixInternalEventSource {
|
||||
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
|
||||
let timeout = PollTimeout::new(timeout);
|
||||
|
||||
loop {
|
||||
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
|
||||
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
|
||||
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
|
||||
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
|
||||
if e.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if self.events.is_empty() {
|
||||
// No readiness events = timeout
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
for token in self.events.iter().map(|x| x.token()) {
|
||||
match token {
|
||||
TTY_TOKEN => {
|
||||
loop {
|
||||
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
|
||||
Ok(read_count) => {
|
||||
if read_count > 0 {
|
||||
self.parser.advance(
|
||||
&self.tty_buffer[..read_count],
|
||||
read_count == TTY_BUFFER_SIZE,
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// No more data to read at the moment. We will receive another event
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
break;
|
||||
}
|
||||
// once more data is available to read.
|
||||
else if e.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
SIGNAL_TOKEN => {
|
||||
for signal in self.signals.pending() {
|
||||
match signal {
|
||||
signal_hook::consts::SIGWINCH => {
|
||||
// TODO Should we remove tput?
|
||||
//
|
||||
// This can take a really long time, because terminal::size can
|
||||
// launch new process (tput) and then it parses its output. It's
|
||||
// not a really long time from the absolute time point of view, but
|
||||
// it's a really long time from the mio, async-std/tokio executor, ...
|
||||
// point of view.
|
||||
let new_size = crate::terminal::size()?;
|
||||
return Ok(Some(InternalEvent::Event(Event::Resize(
|
||||
new_size.0, new_size.1,
|
||||
))));
|
||||
}
|
||||
_ => unreachable!("Synchronize signal registration & handling"),
|
||||
};
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "event-stream")]
|
||||
WAKE_TOKEN => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Interrupted,
|
||||
"Poll operation was woken up by `Waker::wake`",
|
||||
));
|
||||
}
|
||||
_ => unreachable!("Synchronize Evented handle registration & token handling"),
|
||||
}
|
||||
}
|
||||
|
||||
// Processing above can take some time, check if timeout expired
|
||||
if timeout.elapsed() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
fn waker(&self) -> Waker {
|
||||
self.waker.clone()
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Following `Parser` structure exists for two reasons:
|
||||
//
|
||||
// * mimic anes Parser interface
|
||||
// * move the advancing, parsing, ... stuff out of the `try_read` method
|
||||
//
|
||||
#[derive(Debug)]
|
||||
struct Parser {
|
||||
buffer: Vec<u8>,
|
||||
internal_events: VecDeque<InternalEvent>,
|
||||
}
|
||||
|
||||
impl Default for Parser {
|
||||
fn default() -> Self {
|
||||
Parser {
|
||||
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
|
||||
// aware of any ANSI escape sequence that is bigger? Can we make
|
||||
// it smaller?
|
||||
//
|
||||
// Probably not worth spending more time on this as "there's a plan"
|
||||
// to use the anes crate parser.
|
||||
buffer: Vec::with_capacity(256),
|
||||
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
|
||||
// fit? What is an average sequence length? Let's guess here
|
||||
// and say that the average ANSI escape sequence length is 8 bytes. Thus
|
||||
// the buffer size should be 1024/8=128 to avoid additional allocations
|
||||
// when processing large amounts of data.
|
||||
//
|
||||
// There's no need to make it bigger, because when you look at the `try_read`
|
||||
// method implementation, all events are consumed before the next TTY_BUFFER
|
||||
// is processed -> events pushed.
|
||||
internal_events: VecDeque::with_capacity(128),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Parser {
|
||||
fn advance(&mut self, buffer: &[u8], more: bool) {
|
||||
for (idx, byte) in buffer.iter().enumerate() {
|
||||
let more = idx + 1 < buffer.len() || more;
|
||||
|
||||
self.buffer.push(*byte);
|
||||
|
||||
match parse_event(&self.buffer, more) {
|
||||
Ok(Some(ie)) => {
|
||||
self.internal_events.push_back(ie);
|
||||
self.buffer.clear();
|
||||
}
|
||||
Ok(None) => {
|
||||
// Event can't be parsed, because we don't have enough bytes for
|
||||
// the current sequence. Keep the buffer and process next bytes.
|
||||
}
|
||||
Err(_) => {
|
||||
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
|
||||
// Clear the buffer and continue with another sequence.
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Parser {
|
||||
type Item = InternalEvent;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.internal_events.pop_front()
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature = "use-dev-tty"))]
|
||||
pub(crate) use self::mio::UnixInternalEventSource;
|
241
src/event/source/unix/mio.rs
Normal file
241
src/event/source/unix/mio.rs
Normal file
@ -0,0 +1,241 @@
|
||||
use std::{collections::VecDeque, io, time::Duration};
|
||||
|
||||
use mio::{unix::SourceFd, Events, Interest, Poll, Token};
|
||||
use signal_hook_mio::v0_8::Signals;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
use crate::event::sys::Waker;
|
||||
use crate::event::{
|
||||
source::EventSource,
|
||||
sys::unix::{
|
||||
file_descriptor::{tty_fd, FileDesc},
|
||||
parse::parse_event,
|
||||
},
|
||||
timeout::PollTimeout,
|
||||
Event, InternalEvent,
|
||||
};
|
||||
|
||||
// Tokens to identify file descriptor
|
||||
const TTY_TOKEN: Token = Token(0);
|
||||
const SIGNAL_TOKEN: Token = Token(1);
|
||||
#[cfg(feature = "event-stream")]
|
||||
const WAKE_TOKEN: Token = Token(2);
|
||||
|
||||
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
|
||||
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
|
||||
// is enough.
|
||||
const TTY_BUFFER_SIZE: usize = 1_024;
|
||||
|
||||
pub(crate) struct UnixInternalEventSource {
|
||||
poll: Poll,
|
||||
events: Events,
|
||||
parser: Parser,
|
||||
tty_buffer: [u8; TTY_BUFFER_SIZE],
|
||||
tty_fd: FileDesc,
|
||||
signals: Signals,
|
||||
#[cfg(feature = "event-stream")]
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl UnixInternalEventSource {
|
||||
pub fn new() -> Result<Self> {
|
||||
UnixInternalEventSource::from_file_descriptor(tty_fd()?)
|
||||
}
|
||||
|
||||
pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
|
||||
let poll = Poll::new()?;
|
||||
let registry = poll.registry();
|
||||
|
||||
let tty_raw_fd = input_fd.raw_fd();
|
||||
let mut tty_ev = SourceFd(&tty_raw_fd);
|
||||
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
|
||||
|
||||
let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
|
||||
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
let waker = Waker::new(registry, WAKE_TOKEN)?;
|
||||
|
||||
Ok(UnixInternalEventSource {
|
||||
poll,
|
||||
events: Events::with_capacity(3),
|
||||
parser: Parser::default(),
|
||||
tty_buffer: [0u8; TTY_BUFFER_SIZE],
|
||||
tty_fd: input_fd,
|
||||
signals,
|
||||
#[cfg(feature = "event-stream")]
|
||||
waker,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl EventSource for UnixInternalEventSource {
|
||||
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
|
||||
let timeout = PollTimeout::new(timeout);
|
||||
|
||||
loop {
|
||||
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
|
||||
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
|
||||
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
|
||||
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
|
||||
if e.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if self.events.is_empty() {
|
||||
// No readiness events = timeout
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
for token in self.events.iter().map(|x| x.token()) {
|
||||
match token {
|
||||
TTY_TOKEN => {
|
||||
loop {
|
||||
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
|
||||
Ok(read_count) => {
|
||||
if read_count > 0 {
|
||||
self.parser.advance(
|
||||
&self.tty_buffer[..read_count],
|
||||
read_count == TTY_BUFFER_SIZE,
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// No more data to read at the moment. We will receive another event
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
break;
|
||||
}
|
||||
// once more data is available to read.
|
||||
else if e.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
SIGNAL_TOKEN => {
|
||||
for signal in self.signals.pending() {
|
||||
match signal {
|
||||
signal_hook::consts::SIGWINCH => {
|
||||
// TODO Should we remove tput?
|
||||
//
|
||||
// This can take a really long time, because terminal::size can
|
||||
// launch new process (tput) and then it parses its output. It's
|
||||
// not a really long time from the absolute time point of view, but
|
||||
// it's a really long time from the mio, async-std/tokio executor, ...
|
||||
// point of view.
|
||||
let new_size = crate::terminal::size()?;
|
||||
return Ok(Some(InternalEvent::Event(Event::Resize(
|
||||
new_size.0, new_size.1,
|
||||
))));
|
||||
}
|
||||
_ => unreachable!("Synchronize signal registration & handling"),
|
||||
};
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "event-stream")]
|
||||
WAKE_TOKEN => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Interrupted,
|
||||
"Poll operation was woken up by `Waker::wake`",
|
||||
));
|
||||
}
|
||||
_ => unreachable!("Synchronize Evented handle registration & token handling"),
|
||||
}
|
||||
}
|
||||
|
||||
// Processing above can take some time, check if timeout expired
|
||||
if timeout.elapsed() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
fn waker(&self) -> Waker {
|
||||
self.waker.clone()
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Following `Parser` structure exists for two reasons:
|
||||
//
|
||||
// * mimic anes Parser interface
|
||||
// * move the advancing, parsing, ... stuff out of the `try_read` method
|
||||
//
|
||||
#[derive(Debug)]
|
||||
struct Parser {
|
||||
buffer: Vec<u8>,
|
||||
internal_events: VecDeque<InternalEvent>,
|
||||
}
|
||||
|
||||
impl Default for Parser {
|
||||
fn default() -> Self {
|
||||
Parser {
|
||||
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
|
||||
// aware of any ANSI escape sequence that is bigger? Can we make
|
||||
// it smaller?
|
||||
//
|
||||
// Probably not worth spending more time on this as "there's a plan"
|
||||
// to use the anes crate parser.
|
||||
buffer: Vec::with_capacity(256),
|
||||
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
|
||||
// fit? What is an average sequence length? Let's guess here
|
||||
// and say that the average ANSI escape sequence length is 8 bytes. Thus
|
||||
// the buffer size should be 1024/8=128 to avoid additional allocations
|
||||
// when processing large amounts of data.
|
||||
//
|
||||
// There's no need to make it bigger, because when you look at the `try_read`
|
||||
// method implementation, all events are consumed before the next TTY_BUFFER
|
||||
// is processed -> events pushed.
|
||||
internal_events: VecDeque::with_capacity(128),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Parser {
|
||||
fn advance(&mut self, buffer: &[u8], more: bool) {
|
||||
for (idx, byte) in buffer.iter().enumerate() {
|
||||
let more = idx + 1 < buffer.len() || more;
|
||||
|
||||
self.buffer.push(*byte);
|
||||
|
||||
match parse_event(&self.buffer, more) {
|
||||
Ok(Some(ie)) => {
|
||||
self.internal_events.push_back(ie);
|
||||
self.buffer.clear();
|
||||
}
|
||||
Ok(None) => {
|
||||
// Event can't be parsed, because we don't have enough bytes for
|
||||
// the current sequence. Keep the buffer and process next bytes.
|
||||
}
|
||||
Err(_) => {
|
||||
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
|
||||
// Clear the buffer and continue with another sequence.
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Parser {
|
||||
type Item = InternalEvent;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.internal_events.pop_front()
|
||||
}
|
||||
}
|
264
src/event/source/unix/tty.rs
Normal file
264
src/event/source/unix/tty.rs
Normal file
@ -0,0 +1,264 @@
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration};
|
||||
|
||||
use signal_hook::low_level::pipe;
|
||||
|
||||
use crate::event::timeout::PollTimeout;
|
||||
use crate::event::Event;
|
||||
use crate::Result;
|
||||
|
||||
use filedescriptor::{poll, pollfd, POLLIN};
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
use crate::event::sys::Waker;
|
||||
|
||||
use crate::{
|
||||
event::{
|
||||
sys::unix::{
|
||||
file_descriptor::{tty_fd, FileDesc},
|
||||
parse::parse_event,
|
||||
},
|
||||
InternalEvent,
|
||||
source::EventSource,
|
||||
},
|
||||
};
|
||||
|
||||
/// Holds a prototypical Waker and a receiver we can wait on when doing select().
|
||||
#[cfg(feature = "event-stream")]
|
||||
struct WakePipe {
|
||||
receiver: UnixStream,
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
impl WakePipe {
|
||||
fn new() -> Result<Self> {
|
||||
let (receiver, sender) = nonblocking_unix_pair()?;
|
||||
Ok(WakePipe {
|
||||
receiver,
|
||||
waker: Waker::new(sender),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
|
||||
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
|
||||
// is enough.
|
||||
const TTY_BUFFER_SIZE: usize = 1_024;
|
||||
|
||||
pub(crate) struct UnixInternalEventSource {
|
||||
parser: Parser,
|
||||
tty_buffer: [u8; TTY_BUFFER_SIZE],
|
||||
tty: FileDesc,
|
||||
winch_signal_receiver: UnixStream,
|
||||
#[cfg(feature = "event-stream")]
|
||||
wake_pipe: WakePipe,
|
||||
}
|
||||
|
||||
fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> {
|
||||
let (receiver, sender) = UnixStream::pair()?;
|
||||
receiver.set_nonblocking(true)?;
|
||||
sender.set_nonblocking(true)?;
|
||||
Ok((receiver, sender))
|
||||
}
|
||||
|
||||
impl UnixInternalEventSource {
|
||||
pub fn new() -> Result<Self> {
|
||||
UnixInternalEventSource::from_file_descriptor(tty_fd()?)
|
||||
}
|
||||
|
||||
pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
|
||||
Ok(UnixInternalEventSource {
|
||||
parser: Parser::default(),
|
||||
tty_buffer: [0u8; TTY_BUFFER_SIZE],
|
||||
tty: input_fd,
|
||||
winch_signal_receiver: {
|
||||
let (receiver, sender) = nonblocking_unix_pair()?;
|
||||
// Unregistering is unnecessary because EventSource is a singleton
|
||||
pipe::register(libc::SIGWINCH, sender)?;
|
||||
receiver
|
||||
},
|
||||
#[cfg(feature = "event-stream")]
|
||||
wake_pipe: WakePipe::new()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// read_complete reads from a non-blocking file descriptor
|
||||
/// until the buffer is full or it would block.
|
||||
///
|
||||
/// Similar to `std::io::Read::read_to_end`, except this function
|
||||
/// only fills the given buffer and does not read beyond that.
|
||||
fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result<usize> {
|
||||
loop {
|
||||
match fd.read(buf, buf.len()) {
|
||||
Ok(x) => return Ok(x),
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::WouldBlock => return Ok(0),
|
||||
io::ErrorKind::Interrupted => continue,
|
||||
_ => return Err(e),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventSource for UnixInternalEventSource {
|
||||
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
|
||||
let timeout = PollTimeout::new(timeout);
|
||||
|
||||
fn make_pollfd<F: AsRawFd>(fd: &F) -> pollfd {
|
||||
pollfd {
|
||||
fd: fd.as_raw_fd(),
|
||||
events: POLLIN,
|
||||
revents: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "event-stream"))]
|
||||
let mut fds = [
|
||||
make_pollfd(&self.tty),
|
||||
make_pollfd(&self.winch_signal_receiver),
|
||||
];
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
let mut fds = [
|
||||
make_pollfd(&self.tty),
|
||||
make_pollfd(&self.winch_signal_receiver),
|
||||
make_pollfd(&self.wake_pipe.receiver),
|
||||
];
|
||||
|
||||
while timeout.leftover().map_or(true, |t| !t.is_zero()) {
|
||||
// check if there are buffered events from the last read
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
match poll(&mut fds, timeout.leftover()) {
|
||||
Err(filedescriptor::Error::Io(e)) => return Err(e),
|
||||
res => res.expect("polling tty"),
|
||||
};
|
||||
if fds[0].revents & POLLIN != 0 {
|
||||
loop {
|
||||
let read_count = read_complete(&self.tty, &mut self.tty_buffer)?;
|
||||
if read_count > 0 {
|
||||
self.parser.advance(
|
||||
&self.tty_buffer[..read_count],
|
||||
read_count == TTY_BUFFER_SIZE,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(event) = self.parser.next() {
|
||||
return Ok(Some(event));
|
||||
}
|
||||
|
||||
if read_count == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if fds[1].revents & POLLIN != 0 {
|
||||
let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false);
|
||||
// drain the pipe
|
||||
while read_complete(&fd, &mut [0; 1024])? != 0 {}
|
||||
// TODO Should we remove tput?
|
||||
//
|
||||
// This can take a really long time, because terminal::size can
|
||||
// launch new process (tput) and then it parses its output. It's
|
||||
// not a really long time from the absolute time point of view, but
|
||||
// it's a really long time from the mio, async-std/tokio executor, ...
|
||||
// point of view.
|
||||
let new_size = crate::terminal::size()?;
|
||||
return Ok(Some(InternalEvent::Event(Event::Resize(
|
||||
new_size.0, new_size.1,
|
||||
))));
|
||||
}
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
if fds[2].revents & POLLIN != 0 {
|
||||
let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false);
|
||||
// drain the pipe
|
||||
while read_complete(&fd, &mut [0; 1024])? != 0 {}
|
||||
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Interrupted,
|
||||
"Poll operation was woken up by `Waker::wake`",
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[cfg(feature = "event-stream")]
|
||||
fn waker(&self) -> Waker {
|
||||
self.wake_pipe.waker.clone()
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Following `Parser` structure exists for two reasons:
|
||||
//
|
||||
// * mimic anes Parser interface
|
||||
// * move the advancing, parsing, ... stuff out of the `try_read` method
|
||||
//
|
||||
#[derive(Debug)]
|
||||
struct Parser {
|
||||
buffer: Vec<u8>,
|
||||
internal_events: VecDeque<InternalEvent>,
|
||||
}
|
||||
|
||||
impl Default for Parser {
|
||||
fn default() -> Self {
|
||||
Parser {
|
||||
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
|
||||
// aware of any ANSI escape sequence that is bigger? Can we make
|
||||
// it smaller?
|
||||
//
|
||||
// Probably not worth spending more time on this as "there's a plan"
|
||||
// to use the anes crate parser.
|
||||
buffer: Vec::with_capacity(256),
|
||||
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
|
||||
// fit? What is an average sequence length? Let's guess here
|
||||
// and say that the average ANSI escape sequence length is 8 bytes. Thus
|
||||
// the buffer size should be 1024/8=128 to avoid additional allocations
|
||||
// when processing large amounts of data.
|
||||
//
|
||||
// There's no need to make it bigger, because when you look at the `try_read`
|
||||
// method implementation, all events are consumed before the next TTY_BUFFER
|
||||
// is processed -> events pushed.
|
||||
internal_events: VecDeque::with_capacity(128),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Parser {
|
||||
fn advance(&mut self, buffer: &[u8], more: bool) {
|
||||
for (idx, byte) in buffer.iter().enumerate() {
|
||||
let more = idx + 1 < buffer.len() || more;
|
||||
|
||||
self.buffer.push(*byte);
|
||||
|
||||
match parse_event(&self.buffer, more) {
|
||||
Ok(Some(ie)) => {
|
||||
self.internal_events.push_back(ie);
|
||||
self.buffer.clear();
|
||||
}
|
||||
Ok(None) => {
|
||||
// Event can't be parsed, because we don't have enough bytes for
|
||||
// the current sequence. Keep the buffer and process next bytes.
|
||||
}
|
||||
Err(_) => {
|
||||
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
|
||||
// Clear the buffer and continue with another sequence.
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Parser {
|
||||
type Item = InternalEvent;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.internal_events.pop_front()
|
||||
}
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
use std::{
|
||||
fs, io,
|
||||
os::unix::io::{IntoRawFd, RawFd},
|
||||
os::unix::{
|
||||
io::{IntoRawFd, RawFd},
|
||||
prelude::AsRawFd,
|
||||
},
|
||||
};
|
||||
|
||||
use libc::size_t;
|
||||
@ -34,7 +37,7 @@ impl FileDesc {
|
||||
self.fd,
|
||||
buffer.as_mut_ptr() as *mut libc::c_void,
|
||||
size as size_t,
|
||||
) as isize
|
||||
)
|
||||
};
|
||||
|
||||
if result < 0 {
|
||||
@ -63,6 +66,12 @@ impl Drop for FileDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for FileDesc {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a file descriptor pointing to the standard input or `/dev/tty`.
|
||||
pub fn tty_fd() -> Result<FileDesc> {
|
||||
let (fd, close_on_drop) = if unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } {
|
||||
|
@ -105,11 +105,11 @@ pub(crate) fn parse_event(buffer: &[u8], input_available: bool) -> Result<Option
|
||||
KeyCode::Backspace.into(),
|
||||
)))),
|
||||
c @ b'\x01'..=b'\x1A' => Ok(Some(InternalEvent::Event(Event::Key(KeyEvent::new(
|
||||
KeyCode::Char((c as u8 - 0x1 + b'a') as char),
|
||||
KeyCode::Char((c - 0x1 + b'a') as char),
|
||||
KeyModifiers::CONTROL,
|
||||
))))),
|
||||
c @ b'\x1C'..=b'\x1F' => Ok(Some(InternalEvent::Event(Event::Key(KeyEvent::new(
|
||||
KeyCode::Char((c as u8 - 0x1C + b'4') as char),
|
||||
KeyCode::Char((c - 0x1C + b'4') as char),
|
||||
KeyModifiers::CONTROL,
|
||||
))))),
|
||||
b'\0' => Ok(Some(InternalEvent::Event(Event::Key(KeyEvent::new(
|
||||
|
@ -1,36 +1,11 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
#[cfg(feature = "use-dev-tty")]
|
||||
pub(crate) mod tty;
|
||||
|
||||
use mio::{Registry, Token};
|
||||
#[cfg(not(feature = "use-dev-tty"))]
|
||||
pub(crate) mod mio;
|
||||
|
||||
use crate::Result;
|
||||
#[cfg(feature = "use-dev-tty")]
|
||||
pub(crate) use self::tty::Waker;
|
||||
|
||||
/// Allows to wake up the `mio::Poll::poll()` method.
|
||||
/// This type wraps `mio::Waker`, for more information see its documentation.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct Waker {
|
||||
inner: Arc<Mutex<mio::Waker>>,
|
||||
}
|
||||
|
||||
impl Waker {
|
||||
/// Create a new `Waker`.
|
||||
pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Wake up the [`Poll`] associated with this `Waker`.
|
||||
///
|
||||
/// Readiness is set to `Ready::readable()`.
|
||||
pub(crate) fn wake(&self) -> Result<()> {
|
||||
self.inner.lock().unwrap().wake()
|
||||
}
|
||||
|
||||
/// Resets the state so the same waker can be reused.
|
||||
///
|
||||
/// This function is not impl
|
||||
#[allow(dead_code, clippy::clippy::unnecessary_wraps)]
|
||||
pub(crate) fn reset(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature = "use-dev-tty"))]
|
||||
pub(crate) use self::mio::Waker;
|
36
src/event/sys/unix/waker/mio.rs
Normal file
36
src/event/sys/unix/waker/mio.rs
Normal file
@ -0,0 +1,36 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use mio::{Registry, Token};
|
||||
|
||||
use crate::Result;
|
||||
|
||||
/// Allows to wake up the `mio::Poll::poll()` method.
|
||||
/// This type wraps `mio::Waker`, for more information see its documentation.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct Waker {
|
||||
inner: Arc<Mutex<mio::Waker>>,
|
||||
}
|
||||
|
||||
impl Waker {
|
||||
/// Create a new `Waker`.
|
||||
pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Wake up the [`Poll`] associated with this `Waker`.
|
||||
///
|
||||
/// Readiness is set to `Ready::readable()`.
|
||||
pub(crate) fn wake(&self) -> Result<()> {
|
||||
self.inner.lock().unwrap().wake()
|
||||
}
|
||||
|
||||
/// Resets the state so the same waker can be reused.
|
||||
///
|
||||
/// This function is not impl
|
||||
#[allow(dead_code, clippy::clippy::unnecessary_wraps)]
|
||||
pub(crate) fn reset(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
38
src/event/sys/unix/waker/tty.rs
Normal file
38
src/event/sys/unix/waker/tty.rs
Normal file
@ -0,0 +1,38 @@
|
||||
use std::{
|
||||
io::Write,
|
||||
os::unix::net::UnixStream,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use crate::Result;
|
||||
|
||||
/// Allows to wake up the EventSource::try_read() method.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct Waker {
|
||||
inner: Arc<Mutex<UnixStream>>,
|
||||
}
|
||||
|
||||
impl Waker {
|
||||
/// Create a new `Waker`.
|
||||
pub(crate) fn new(writer: UnixStream) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(writer)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wake up the [`Poll`] associated with this `Waker`.
|
||||
///
|
||||
/// Readiness is set to `Ready::readable()`.
|
||||
pub(crate) fn wake(&self) -> Result<()> {
|
||||
self.inner.lock().unwrap().write(&[0])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resets the state so the same waker can be reused.
|
||||
///
|
||||
/// This function is not impl
|
||||
#[allow(dead_code, clippy::clippy::unnecessary_wraps)]
|
||||
pub(crate) fn reset(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user