How to implement async stream in Rust without resorting to macros

I published the async-fn-stream crate which is a lightweight alternative to async-stream.

Async iteration

Rust provides support for async iteration via the Stream trait. It allows to decouple producer of data from the consumer, just like an Iterator allows the same for blocking code. This is immensely useful if the producer is expected to produce a large (or infinite) sequence.

Both Stream and Iterator are quite easy to use:

// Iterator
for x in produce_iter() {
    println!("{x}");
}

fn produce_iter() -> impl Iterator<Item = i32> { ... }

// Stream
let stream = produce_stream().await;
pin_mut!(stream);
while let Some(x) = stream.next().await {
    println!("{x}");
}

async fn produce_stream() -> impl Stream<Item = i32> { ... }

We can see that the usage of Stream has somewhat more boilerplate code but it's still quite OK. Most likely Rust will in the future gain language support for using Streams in for loops.

The async-stream crate

Currently, it's not too ergonomic to hand-code implementations of either Stream or Iterator. The Iterator::next() method might sometimes be cumbersome to implement, but usually, it's doable without too much effort.

But Stream::poll_next() is much harder to implement. Often this necessitates storing (and swapping) futures inside of Stream implementation struct which is quite hard to do without unsafe code. To tackle this issue, Tokio team has written a great crate async-stream which I recommend checking out.

This macro lets us create an async stream with ease:

use std::path::Path;

use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

async fn read_lines(path: &Path) -> std::io::Result<impl Stream<Item = std::io::Result<String>>> {
    let file = File::open(path).await?;
    let mut file = BufReader::new(file);

    let result = stream! {
        let mut buf = String::new();
        loop {
            buf.clear();
            match file.read_line(&mut buf).await {
                Ok(0) => break,
                Ok(_) => yield Ok(buf.clone()),
                Err(error) => yield Err(error),
            }
        };
    };

    Ok(result)
}

And it even runs and provides correct output!

Why I don't like to use async-stream in production code

So, is the problem solved? Maybe so, but for me, unfortunately, this solution leaves a lot to be desired.

And I must say upfront that this is not a fault of async-stream - those are general problems associated with macros in Rust. async-stream just makes it more apparent because stream! { ... } often encompasses large chunks of code.

Let's look closely at this code.

The first that I notice is that rustfmt does not even try to reformat the code inside stream! macro. For example, if I mess up the formatting of this code like so:

let result = stream! {
    let mut buf = String::new();
    loop
    
    
    {
                    buf.clear(); match file.read_line(&mut buf).await {
            Ok(0) => break,
                Ok(_  )   => yield Ok(buf.clone()), Err(error) => yield Err(error),
        }
    };
};

then rustfmt will leave this code as-is. And if you use rustfmt --check in the CI pipeline to ensure that formatting stays consistent, it also won't notice any inconsistencies in formatting.

The second and much more important downside is that some features of the IDE do not work as well as they should. Here's what happens if I try to use autocomplete in Visual Studio Code with rust-analyzer:

rust-analyzer autocompletion for code inside stream! macro

An alternative to async-stream

Having found this solution unsatisfactory for my needs, I went ahead and published my take on it. Meet the async-fn-stream crate.

It's the same idea as async-stream but implemented without resorting to macros.

Here's what the code looks like and the summary of differences:

  • async_stream::stream! { ... } is replaced with async_fn_stream::fn_stream(|emitter| async move { ... }) (there's also a try_fn_stream which is similar to try_stream!);
  • yield X is replaced with emitter.emit(X).await.
use std::path::Path;

use async_fn_stream::fn_stream;
use futures::{pin_mut, Stream, StreamExt};
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

async fn read_lines(path: &Path) -> std::io::Result<impl Stream<Item = std::io::Result<String>>> {
    let file = File::open(path).await?;
    let mut file = BufReader::new(file);

    let result = fn_stream(|emitter| async move {
        let mut buf = String::new();
        loop {
            buf.clear();
            match file.read_line(&mut buf).await {
                Ok(0) => break,
                Ok(_) => emitter.emit(Ok(buf.clone())).await,
                Err(error) => emitter.emit(Err(error)).await,
            }
        }
    });

    Ok(result)
}

And now, code completion works fine:

rust-analyzer autocompletion for code inside normal function macro

If you've also tried using async-stream and met similar issues, I recommend checking my crate out.