Explained: Futures in Rust for Web Development

+ ⛑If a crate returns a Future, you can use .

and_then() to work with the result once the Future is ready.

Using .

map() on a Future lets you alter the type of the Future.

Inside the async Rust world, we have to take care of different types of data.

Instead of using Strings and Numbers for example, we deal with streams of values.

You will most likely deal with Streams.

A Stream is an extension of a Future.

Where a Future is about producing a single value, a Stream is yielding values as long as they are present.

Stream: A Stream is, like a Future, a trait you can impl on a type.

This lets you iterate over the return values (which are Some(_) or None).

Sink: For writing data continuously (in an asynchronous way) to either a socket or to a file systemSo Streams are for reading data, Sinks are for writing them.

We have two types of Streams in our web ecosystem:Stream of Bytes (like HTTP bodies or TCP streams)Stream of Messages (like WebSocket Frames or UDP Packet), where each message has a fixed sizeCode exampleLets look at a use case where a crate you are using returns a Future.

When doing HTTP calls, reqwest is a good example.

On the Future returned from reqwest, you can use .

and_then() to process the results (????):// We have to use "r#" before "async" because "async" is a reserved keyword.

use reqwest::r#async::Client;// The return type has to be `Future<Item=(), Error=()>` to be able// to use `tokio::run`.

// If it has a different type, you have to use `tokio::block_on`fn fetch_data() -> impl Future<Item=(), Error=()> { Client::new() .

get(url) .

send() .

and_then(|res| { res.

into_body().

concat2() }) .

map_err(|err| println!("request error: {}", err)) .

map(|body| { // here you can use the body to write it to a file // or return it via Ok() // If you return it via for example Ok(users) // then you need to adjust the return type in impl Future<Item=TYPE // Examples can be found here: // https://github.

com/gruberb/futures_playground // For now, lets just turn the body into a Vector let v = body.

to_vec(); })}Once you created the method which returns a Future (fetch_data()), you have to pass it on to a runtime like tokio (⛑):tokio::run(fetch_data());High Level OverviewYou receive a Future from an external crateA Future is likely to return a Stream of values, so you have to form this Stream into a type you can work with in a synchronous way (like a Vector or String)You return the whole Future from a method via -> impl Future<Item=(), Error=()>, where the braces () are placeholders for the actual type you want to returnYou pass the method on to a runtime like tokio via tokio::run(method())The run​ call will start the runtime, set up the required resources and then put this future on a threadpool and start polling your futureIt will then try to pass the work on to the operating systemEach time the runtime polls your Future, if the underlying I/O resource your Future is waiting on is not ready, it will return NotReady​.

The runtime sees this NotReady​ return value and puts your Future to sleepOnce an event from the underlying I/O resource comes, the runtime checks if this I/O resource is associated with your Future, and starts polling again.

This time, your Future will be able to return a Ready​ with a value, since the underlying I/O resource has provided a valueThe runtime will then set the status of the Future to ready and will process the .

and_then() part of the codeThe Future is getting, unlike in NodeJS, executed via tokio::run and not before.

In Node, as soon as you write a Promise, the object is being returned immediately.

What’s so different or hard about Futures? ⛑Lets walk through our example above:We create a new client with Client::new() and .

send() our requestWe will get a Response back:pub struct Response { status: StatusCode, headers: HeaderMap, url: Box<Url>, body: Decoder, .

}The body itself is a Decoder, which can be turned into a Body via .

into_body().

Body itself implements a Stream (as mentioned earlier).

Now we can look into the Futures API from Rust and find out: We can turn a Stream of Bytes into single item via .

concat2().

.

and_then(|res| { res.

into_body().

concat2() }).

We use this single item in the .

map() part as bodyWith the help of your code editor, you will find out that this body is actually a Chunk, returned from the library HyperWe can turn this Chunk now into a Vector.

.

map(|body| { let v = body.

to_vec(); // do whatever with v }).

From then on, we are back in “normal” Rust land and can forget what just happened ????.

You can find the full example in this GitHub repository.

There I receive a JSON and write it to a file.

This is one of the reasons why dealing with Futures is so hard in the beginning.

You have to think much lower level than for example in NodeJS.

In addition, the async/await syntax is not final yet, which leads to much more boiler code.

These mental steps help you to not get lost when dealing with Futures:What’s the return type or value I am getting from this library?How can I access the Stream of values on this response?What will the library turn this Stream into when I collect all the values via .

concat2()?How can I turn this new type into a Vector or another Rust std format so I can pass it on to synchronous methods?Basically, you always want to figure out how to access the stream of values, collect them, and then process the resulting object.

How to execute more than one Future? ????Generally you want to collect your values as Streams, so that for each item you get over a Stream, you can spawn off a new Future to handle it.

The Rust Futures API has a method called FuturesUnordered which you can use to add more than one Future:use futures::stream::futures_unordered::FuturesUnordered;use hyper::{client::ResponseFuture, Client};fn setup_requests() -> FuturesUnordered<ResponseFuture> { let mut list_of_futures = FuturesUnordered::new(); let client = Client::new(); let first = client.

get(URL); list_of_futures.

push(first); let second = client.

get(URL); list_of_futures.

push(second); list_of_futures}In this example we used hyper for our HTTP calls.

You can find the rest of the code over here on Github.

The syntax will look slightly different if you are using reqwest.

Here you .

join() multiple requests and return it as "one Future".

fn fetch() -> impl Future<Item=(), Error=()> { let client = Client::new(); let json = |mut res : Response | { res.

json::<STRUCT_TYPE>() }; let request1 = client .

get(URL) .

send() .

and_then(json); let request2 = client .

get(URL) .

send() .

and_then(json); request1.

join(request2) .

map(|(res1, res2)|{ println!("{:?}", res1); println!("{:?}", res2); }) .

map_err(|err| { println!("stdout error: {}", err); })}The full code can also be found on GitHub.

What is the future of Futures?.????.+ ⛑Futures are getting into stable Rust at 1.

37, so around June.

There are also changes around the syntax and runtime, which will benefit the amount of code you have to write to get this stream of values out of a Future and into a synchronous Rust format.

You can also use the Runtime crate, which saves you almost all of the boilerplate code.

Although going through the process above helps you understand Futures on a deeper level.

SummaryIf you perform asynchronous operations, like fetching files from the operating system or making HTTP requests to a remote server, then Futures let you work with the return values in a non-blocking way.

If it would be synchronous, you would have to block a thread the operation is running on, and wait for the result until you continue.

To do this in an asynchronous way, we have a runtime which creates threads themselves, and takes on Futures.

It will fill the value inside the Future when the operating system returns a value to the runtime.

Once the Future is fulfilled, the runtime sets Async::Ready and the .

and_then() part of the code will get executed.

.. More details

Leave a Reply