Improve try_read performance & clarity

Signed-off-by: Robert Vojta <rvojta@me.com>
This commit is contained in:
Robert Vojta 2019-12-03 10:53:24 +01:00 committed by Robert Vojta
parent d87116a90c
commit fb5a8f24cc

View File

@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::io; use std::io;
use std::time::Duration; use std::time::Duration;
@ -18,6 +19,11 @@ const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1); const SIGNAL_TOKEN: Token = Token(1);
const WAKE_TOKEN: Token = Token(2); const WAKE_TOKEN: Token = Token(2);
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_204;
/// Creates a new pipe and returns `(read, write)` file descriptors. /// Creates a new pipe and returns `(read, write)` file descriptors.
fn pipe() -> Result<(FileDesc, FileDesc)> { fn pipe() -> Result<(FileDesc, FileDesc)> {
let (read_fd, write_fd) = unsafe { let (read_fd, write_fd) = unsafe {
@ -37,7 +43,8 @@ fn pipe() -> Result<(FileDesc, FileDesc)> {
pub(crate) struct UnixInternalEventSource { pub(crate) struct UnixInternalEventSource {
poll: Poll, poll: Poll,
events: Events, events: Events,
tty_buffer: Vec<u8>, parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc, tty_fd: FileDesc,
signals: Signals, signals: Signals,
wake_read_fd: FileDesc, wake_read_fd: FileDesc,
@ -88,7 +95,8 @@ impl UnixInternalEventSource {
Ok(UnixInternalEventSource { Ok(UnixInternalEventSource {
poll, poll,
events: Events::with_capacity(3), events: Events::with_capacity(3),
tty_buffer: Vec::new(), parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd, tty_fd: input_fd,
signals, signals,
wake_read_fd, wake_read_fd,
@ -99,77 +107,77 @@ impl UnixInternalEventSource {
impl EventSource for UnixInternalEventSource { impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> { fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
let timeout = PollTimeout::new(timeout); let timeout = PollTimeout::new(timeout);
let mut additional_input_events = Events::with_capacity(3);
loop { loop {
let event_count = self.poll.poll(&mut self.events, timeout.leftover())?; self.poll.poll(&mut self.events, timeout.leftover())?;
match event_count { if self.events.is_empty() {
event_count if event_count > 0 => { // No readiness events = timeout
let events_count = self return Ok(None);
.events }
.iter()
.map(|x| x.token())
.collect::<Vec<Token>>();
for event in events_count { for token in self.events.iter().map(|x| x.token()) {
match event { match token {
TTY_TOKEN => { TTY_TOKEN => {
self.tty_buffer.push(self.tty_fd.read_byte()?); let read_count = self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE)?;
let input_available = self if read_count > 0 {
.poll self.poll
.poll(&mut self.events, Some(Duration::from_secs(0))) .poll(&mut additional_input_events, Some(Duration::from_secs(0)))?;
.map(|x| x > 0)?;
match parse_event(&self.tty_buffer, input_available) { let additional_input_available = additional_input_events
Ok(None) => { .iter()
// Not enough bytes to construct an InternalEvent .any(|event| event.token() == TTY_TOKEN);
}
Ok(Some(ie)) => { self.parser.advance(
self.tty_buffer.clear(); &self.tty_buffer[..read_count],
return Ok(Some(ie)); additional_input_available,
} );
Err(_) => {
// Can't parse an event, clear buffer and start over if let Some(event) = self.parser.next() {
self.tty_buffer.clear(); return Ok(Some(event));
}
};
} }
SIGNAL_TOKEN => {
for signal in &self.signals {
match signal as libc::c_int {
signal_hook::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!(),
};
}
}
WAKE_TOKEN => {
// Something happened on the self pipe. Try to read single byte
// (see wake() fn) and ignore result. If we can't read the byte,
// mio Poll::poll will fire another event with WAKE_TOKEN.
let _ = self.wake_read_fd.read_byte();
return Ok(None);
}
_ => {}
} }
} }
SIGNAL_TOKEN => {
for signal in &self.signals {
match signal as libc::c_int {
signal_hook::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!("Synchronize signal registration & handling"),
};
}
}
WAKE_TOKEN => {
// Something happened on the self pipe. Try to read single byte
// (see wake() fn) and ignore result. If we can't read the byte,
// mio Poll::poll will fire another event with WAKE_TOKEN.
let mut buf = [0u8; 1];
let _ = self.wake_read_fd.read(&mut buf, 1);
return Ok(None);
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
} }
_ => return Ok(None), }
};
// Processing above can take some time, check if timeout expired
if timeout.elapsed() { if timeout.elapsed() {
return Ok(None); return Ok(None);
} }
@ -184,3 +192,72 @@ impl EventSource for UnixInternalEventSource {
let _ = self.wake_write_fd.write(&[0x57]); let _ = self.wake_write_fd.write(&[0x57]);
} }
} }
//
// Following `Parser` structure exists for two reasons:
//
// * mimick anes Parser interface
// * move the advancing, parsing, ... stuff out of the `try_read` method
//
struct Parser {
buffer: Vec<u8>,
internal_events: VecDeque<InternalEvent>,
}
impl Default for Parser {
fn default() -> Self {
Parser {
// This buffer is used for -> 1 <- ANSI escape sequence. Are we
// aware of any ANSI escape sequence that is bigger? Can we make
// it smaller?
//
// Probably not worth spending more time on this as "there's a plan"
// to use the anes crate parser.
buffer: Vec::with_capacity(256),
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
// fit? What is an average sequence length? Let's guess here
// and say that the average ANSI escape sequence length is 8 bytes. Thus
// the buffer size should be 1024/8=128 to avoid additional allocations
// when processing large amounts of data.
//
// There's no need to make it bigger, because when you look at the `try_read`
// method implementation, all events are consumed before the next TTY_BUFFER
// is processed -> events pushed.
internal_events: VecDeque::with_capacity(128),
}
}
}
impl Parser {
fn advance(&mut self, buffer: &[u8], more: bool) {
for (idx, byte) in buffer.iter().enumerate() {
let more = idx + 1 < buffer.len() || more;
self.buffer.push(*byte);
match parse_event(&self.buffer, more) {
Ok(Some(ie)) => {
self.internal_events.push_back(ie);
self.buffer.clear();
}
Ok(None) => {
// Event can't be parsed, because we don't have enough bytes for
// the current sequence. Keep the buffer and process next bytes.
}
Err(_) => {
// Event can't be parsed (not enough parameters, parameter is not a number, ...).
// Clear the buffer and continue with another sequence.
self.buffer.clear();
}
}
}
}
}
impl Iterator for Parser {
type Item = InternalEvent;
fn next(&mut self) -> Option<Self::Item> {
self.internal_events.pop_front()
}
}