diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 1ee2802..31a4c8a 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::io; use std::time::Duration; @@ -18,6 +19,11 @@ const TTY_TOKEN: Token = Token(0); const SIGNAL_TOKEN: Token = Token(1); const WAKE_TOKEN: Token = Token(2); +// I (@zrzka) wasn't able to read more than 1_022 bytes when testing +// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes +// is enough. +const TTY_BUFFER_SIZE: usize = 1_204; + /// Creates a new pipe and returns `(read, write)` file descriptors. fn pipe() -> Result<(FileDesc, FileDesc)> { let (read_fd, write_fd) = unsafe { @@ -37,7 +43,8 @@ fn pipe() -> Result<(FileDesc, FileDesc)> { pub(crate) struct UnixInternalEventSource { poll: Poll, events: Events, - tty_buffer: Vec, + parser: Parser, + tty_buffer: [u8; TTY_BUFFER_SIZE], tty_fd: FileDesc, signals: Signals, wake_read_fd: FileDesc, @@ -88,7 +95,8 @@ impl UnixInternalEventSource { Ok(UnixInternalEventSource { poll, events: Events::with_capacity(3), - tty_buffer: Vec::new(), + parser: Parser::default(), + tty_buffer: [0u8; TTY_BUFFER_SIZE], tty_fd: input_fd, signals, wake_read_fd, @@ -99,77 +107,77 @@ impl UnixInternalEventSource { impl EventSource for UnixInternalEventSource { fn try_read(&mut self, timeout: Option) -> Result> { + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + let timeout = PollTimeout::new(timeout); + let mut additional_input_events = Events::with_capacity(3); loop { - let event_count = self.poll.poll(&mut self.events, timeout.leftover())?; + self.poll.poll(&mut self.events, timeout.leftover())?; - match event_count { - event_count if event_count > 0 => { - let events_count = self - .events - .iter() - .map(|x| x.token()) - .collect::>(); + if self.events.is_empty() { + // No readiness events = timeout + return Ok(None); + } - for event in events_count { - match event { - TTY_TOKEN => { - self.tty_buffer.push(self.tty_fd.read_byte()?); + for token in self.events.iter().map(|x| x.token()) { + match token { + TTY_TOKEN => { + let read_count = self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE)?; - let input_available = self - .poll - .poll(&mut self.events, Some(Duration::from_secs(0))) - .map(|x| x > 0)?; + if read_count > 0 { + self.poll + .poll(&mut additional_input_events, Some(Duration::from_secs(0)))?; - match parse_event(&self.tty_buffer, input_available) { - Ok(None) => { - // Not enough bytes to construct an InternalEvent - } - Ok(Some(ie)) => { - self.tty_buffer.clear(); - return Ok(Some(ie)); - } - Err(_) => { - // Can't parse an event, clear buffer and start over - self.tty_buffer.clear(); - } - }; + let additional_input_available = additional_input_events + .iter() + .any(|event| event.token() == TTY_TOKEN); + + self.parser.advance( + &self.tty_buffer[..read_count], + additional_input_available, + ); + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); } - SIGNAL_TOKEN => { - for signal in &self.signals { - match signal as libc::c_int { - signal_hook::SIGWINCH => { - // TODO Should we remove tput? - // - // This can take a really long time, because terminal::size can - // launch new process (tput) and then it parses its output. It's - // not a really long time from the absolute time point of view, but - // it's a really long time from the mio, async-std/tokio executor, ... - // point of view. - let new_size = crate::terminal::size()?; - return Ok(Some(InternalEvent::Event(Event::Resize( - new_size.0, new_size.1, - )))); - } - _ => unreachable!(), - }; - } - } - WAKE_TOKEN => { - // Something happened on the self pipe. Try to read single byte - // (see wake() fn) and ignore result. If we can't read the byte, - // mio Poll::poll will fire another event with WAKE_TOKEN. - let _ = self.wake_read_fd.read_byte(); - return Ok(None); - } - _ => {} } } + SIGNAL_TOKEN => { + for signal in &self.signals { + match signal as libc::c_int { + signal_hook::SIGWINCH => { + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + _ => unreachable!("Synchronize signal registration & handling"), + }; + } + } + WAKE_TOKEN => { + // Something happened on the self pipe. Try to read single byte + // (see wake() fn) and ignore result. If we can't read the byte, + // mio Poll::poll will fire another event with WAKE_TOKEN. + let mut buf = [0u8; 1]; + let _ = self.wake_read_fd.read(&mut buf, 1); + return Ok(None); + } + _ => unreachable!("Synchronize Evented handle registration & token handling"), } - _ => return Ok(None), - }; + } + // Processing above can take some time, check if timeout expired if timeout.elapsed() { return Ok(None); } @@ -184,3 +192,72 @@ impl EventSource for UnixInternalEventSource { let _ = self.wake_write_fd.write(&[0x57]); } } + +// +// Following `Parser` structure exists for two reasons: +// +// * mimick anes Parser interface +// * move the advancing, parsing, ... stuff out of the `try_read` method +// +struct Parser { + buffer: Vec, + internal_events: VecDeque, +} + +impl Default for Parser { + fn default() -> Self { + Parser { + // This buffer is used for -> 1 <- ANSI escape sequence. Are we + // aware of any ANSI escape sequence that is bigger? Can we make + // it smaller? + // + // Probably not worth spending more time on this as "there's a plan" + // to use the anes crate parser. + buffer: Vec::with_capacity(256), + // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can + // fit? What is an average sequence length? Let's guess here + // and say that the average ANSI escape sequence length is 8 bytes. Thus + // the buffer size should be 1024/8=128 to avoid additional allocations + // when processing large amounts of data. + // + // There's no need to make it bigger, because when you look at the `try_read` + // method implementation, all events are consumed before the next TTY_BUFFER + // is processed -> events pushed. + internal_events: VecDeque::with_capacity(128), + } + } +} + +impl Parser { + fn advance(&mut self, buffer: &[u8], more: bool) { + for (idx, byte) in buffer.iter().enumerate() { + let more = idx + 1 < buffer.len() || more; + + self.buffer.push(*byte); + + match parse_event(&self.buffer, more) { + Ok(Some(ie)) => { + self.internal_events.push_back(ie); + self.buffer.clear(); + } + Ok(None) => { + // Event can't be parsed, because we don't have enough bytes for + // the current sequence. Keep the buffer and process next bytes. + } + Err(_) => { + // Event can't be parsed (not enough parameters, parameter is not a number, ...). + // Clear the buffer and continue with another sequence. + self.buffer.clear(); + } + } + } + } +} + +impl Iterator for Parser { + type Item = InternalEvent; + + fn next(&mut self) -> Option { + self.internal_events.pop_front() + } +}