While building a Rust server to serve Astro sites, I ran into something called StreamTrait.
I had no idea what this was, or how to work with it. I hacked together a few examples and got it working, but I wanted to understand how it worked.
I decided to try and build my own Ext trait. I implemented the next
, take
, and map
methods, and integrated them with the Future
trait.
What Are Ext
Traits?
An Ext trait in Rust is a pattern used to extend the functionality of an existing type or trait. By defining methods in an Ext trait, you can add custom behavior to types without modifying their original implementation. This is particularly useful for adding utility methods to widely used traits like Stream.
The Ext pattern is often used because working directly with core traits like Stream can be difficult. The Stream trait itself defines a minimal set of low-level methods, such as poll_next
, that are essential for implementing a stream but are not user-friendly for day-to-day use. Extension traits like StreamExt provide higher-level utility methods—such as next
, filter
, and map
—built on top of these low-level primitives. This makes streams much easier to work with.
In this tutorial, I’ll build a custom Ext trait and add next
, map
, and take
methods to _Stream. These methods will return a Future that resolves to the next item in the stream.
Building the StreamExt
Trait
Let’s start by defining the StreamExt
trait. It extends the Stream
trait with a next
, map
and take
methods:
use futures_core::stream::Stream;
// Extension trait
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next { stream: self }
}
fn map<F, T>(self, f: F) -> Map<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> T,
{
Map { stream: self, f }
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take {
stream: self,
remaining: n,
}
}
}
// Implement the extension trait for all types that implement Stream
impl<T: ?Sized> StreamExt for T where T: Stream {}
Here’s what’s happening:
- The
StreamExt
trait defines anext
method that returns aNext
struct. - It also defines a
map
method, which takes a closure and applies it to each item in the stream. - The
take
method limits the number of items produced by a stream. - We implement
StreamExt
for all types that implementStream
, ensuring broad compatibility.
Introducing the Next
Future
The next
method returns a Next
struct, which implements the Future
trait. This is where the magic happens:
use futures_core::task::{Context, Poll};
use std::pin::Pin;
// Future returned by the `next` method
pub struct Next<'a, S: ?Sized> {
stream: &'a mut S,
}
impl<S: Stream + Unpin + ?Sized> std::future::Future for Next<'_, S> {
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream = Pin::new(&mut *self.get_mut().stream);
stream.poll_next(cx)
}
}
Key points:
Next
holds a mutable reference to the stream.- It implements the
Future
trait, withOutput
beingOption<S::Item>
. - The
poll
method drives the stream’spoll_next
method to retrieve the next item.
Introducing the Map
Adapter
The map
method transforms each item in the stream using a provided closure. Here’s how it works:
// Stream adapter for `map`
pub struct Map<S, F> {
stream: S,
f: F,
}
impl<S, F, T> Stream for Map<S, F>
where
S: Stream + std::marker::Unpin,
F: FnMut(S::Item) -> T + std::marker::Unpin,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
Key points:
Map
wraps a stream and a closure.- It implements
Stream
, transforming each item from the inner stream using the closure. - The
poll_next
method applies the closure to each item as it is polled.
Introducing the Take
Adapter
The take
method limits the number of items a stream produces. Here’s how it works:
// Stream adapter for `take`
pub struct Take<S> {
stream: S,
remaining: usize,
}
impl<S: Stream + Unpin> Stream for Take<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.remaining == 0 {
Poll::Ready(None)
} else {
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
this.remaining -= 1;
Poll::Ready(Some(item))
}
other => other,
}
}
}
}
Key points:
Take
tracks the number of remaining items to produce.- It stops producing items when the count reaches zero.
- It passes through items from the inner stream until the limit is reached.
Using the next
, map
, and take
Methods
Now that we have our StreamExt
trait with next
, map
, and take
, let’s see them in action:
use futures::stream;
#[tokio::main]
async fn main() {
let mut stream = stream::repeat_with(|| 42).take(10).map(|x| x * 2);
// Using the `next` method from our custom `StreamExt`
while let Some(item) = stream.next().await {
println!("{}", item);
}
}
Here’s what’s happening:
- We create a stream that repeatedly yields the number
42
. - We use the
take
method to limit the stream to 10 items, ensuring it terminates. - We use the
map
method to multiply each item by 2. - We use the
next
method to retrieve items from the transformed stream one at a time. - The
.await
keyword drives the futures to completion, printing each transformed item.
Why Use .await
?
Finally, you might wonder why .await
is necessary. The answer lies in the asynchronous nature of streams:
- The
next
method returns aNext
struct, which implementsFuture
. - Calling
.await
drives this future to completion, executing itspoll
method. - The
poll
method, in turn, invokespoll_next
on the underlying stream, retrieving the next item asynchronously.
Recap and Takeaways
By combining Ext
traits with futures, we can create ergonomic and powerful abstractions for working with streams in Rust. I’ve pushed this example to GitHub if you want to try it out yourself.