diff --git a/zed/src/throttle.rs b/zed/src/throttle.rs new file mode 100644 index 0000000000..2fcd028496 --- /dev/null +++ b/zed/src/throttle.rs @@ -0,0 +1,68 @@ +use core::time; +use futures_core::{Future, Stream}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use time::Duration; + +pub struct Throttled { + period: Duration, + stream: S, + timer: Option, +} + +pub fn throttled(period: Duration, stream: S) -> impl Stream { + Throttled { + period, + stream, + timer: None, + } +} + +impl Stream for Throttled { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(timer) = self.as_mut().timer() { + if let Poll::Pending = timer.poll(cx) { + return Poll::Pending; + } else { + self.as_mut().get_mut().timer = None; + } + } + + let mut stream = self.as_mut().stream(); + let mut last_item = None; + loop { + match stream.as_mut().poll_next(cx) { + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Ready(Some(item)) => last_item = Some(item), + Poll::Pending => break, + } + } + + if let Some(last_item) = last_item { + self.get_mut().timer = Some(smol::Timer::after(self.period)); + Poll::Ready(Some(last_item)) + } else { + Poll::Pending + } + } +} + +impl Throttled { + fn stream(self: Pin<&mut Self>) -> Pin<&mut S> { + unsafe { self.map_unchecked_mut(|s| &mut s.stream) } + } + + fn timer(self: Pin<&mut Self>) -> Option> { + if self.timer.is_some() { + Some(unsafe { self.map_unchecked_mut(|s| s.timer.as_mut().unwrap()) }) + } else { + None + } + } +}