Stream + async await

# Naveen Chawla (8 years ago)

It'd be great to have async stream constructs such as: reactivex.io/rxjs , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

# Domenic Denicola (8 years ago)

tc39/proposal-async-iteration

From: es-discuss [mailto:es-discuss-bounces at mozilla.org] On Behalf Of Naveen Chawla Sent: Tuesday, July 11, 2017 09:24 To: es-discuss at mozilla.org Subject: Stream + async await

It'd be great to have async stream constructs such as: reactivex.io/rxjs , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

# Naveen Chawla (7 years ago)

Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

for await (const item of requestItems){

}

on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

# Ron Buckton (7 years ago)

One option might be something like rbuckton/prex/blob/master/docs/scheduling.md#class-asyncqueue. It allows you to put items on the queue as soon as they are available.

Ron

From: Naveen Chawla<mailto:naveen.chwl at gmail.com>

Sent: Saturday, July 29, 2017 3:54 AM To: Domenic Denicola<mailto:d at domenic.me>; es-discuss at mozilla.org<mailto:es-discuss at mozilla.org>

Subject: Re: Stream + async await

Interesting!!!

Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?

As far as I can see,

for await (const item of requestItems){

}

on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.

Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.

You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)

On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <d at domenic.me<mailto:d at domenic.me>> wrote:

tc39/proposal-async-iterationna01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Ftc39%2Fproposal-async-iteration&data=04|01|ron.buckton%40microsoft.com|90d8f33f560d48305ad808d4d6703ae6|72f988bf86f141af91ab2d7cd011db47|1|0|636369224897437474|Unknown|VW5rbm93bnx7IlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiT3RoZXIifQ%3D%3D|-1&sdata=QxOturJl0rd7nIv2%2Fy2MVAiVCVF2sd8arppSGIFiV9k%3D&reserved=0

From: es-discuss [mailto:es-discuss-bounces at mozilla.org<mailto:es-discuss-bounces at mozilla.org>] On Behalf Of Naveen Chawla

Sent: Tuesday, July 11, 2017 09:24 To: es-discuss at mozilla.org<mailto:es-discuss at mozilla.org>

Subject: Stream + async await

It'd be great to have async stream constructs such as: reactivex.io/rxjsna01.safelinks.protection.outlook.com/?url=http%3A%2F%2Freactivex.io%2Frxjs%2F&data=04|01|ron.buckton%40microsoft.com|90d8f33f560d48305ad808d4d6703ae6|72f988bf86f141af91ab2d7cd011db47|1|0|636369224897437474|Unknown|VW5rbm93bnx7IlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiT3RoZXIifQ%3D%3D|-1&sdata=WlsRBdhN2hkFpFcQ6zJK846k31TYWog91myIwiw02ow%3D&reserved=0 , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).

# Isiah Meadows (7 years ago)

There's also this strawman of mine which deals with most things async, but it has several of its own issues that I haven't quite addressed (complexity still being one after a week straight):

isiahmeadows/non-linear-proposal

I will caution that async iteration and observation can't really be merged meaningfully as you would hope. They both represent multiple values over time, but one is eager, the other lazy.

# Naveen Chawla (7 years ago)

Guys!

I thought of a way of doing this. Roughly it's having 2 loops, one for consumer and one for requester. Would this work?:

    startRequestingAsync();
    startConsumingAsync();

    async startRequestingAsync(){
        for async (const item of requestItems){
            //Control the requesting. Do nothing to actually consume the
data
        }
    }

    async startConsumingAsync(){
        for async(const item of requestItems){
            //Consume the item. Here you can also apply "front pressure" to
request the next item if the "request" loop is behind
        }
    }

I haven't actually done this so it's just thinking out loud.

Thoughts?

# kai zhu (7 years ago)

so that everyday programmers can relate, can someone give a code-snippet of the proposal equivalent to the common-use nodejs example below?

function consumeReadableStream(stream, consumeChunk, callback) {
/*
 * stream - readable stream
 * consumeChunk - has signature - function (chunk) {...}
 * callback - has signature - function (error) {...}
 */
    var callbackOnce, done, timerTimeout;

    callbackOnce = function (error) {
    /*
     * this function will ensure callback is called only once
     */
        if (done) {
            return;
        }
        done = true;
        // clear timeout
        clearTimeout(timerTimeout);
        callback(error);
    };

    // init timeout handler
    timerTimeout = setTimeout(function () {
        callbackOnce(new Error('30000 ms timeout'));
        stream.destroy();
    }, 30000);

    stream.on('data', consumeChunk);
    stream.on('end', callbackOnce);
    stream.on('error', callbackOnce);
};
# James Browning (7 years ago)

It'll look something like this:


async function consumeReadableStream(stream) {
    const start = Date.now()
    for await (const chunk of stream) {

        /* Do whatever you want with the chunk here e,g, await other
async tasks with chunks
            send them off to wherever, etc
        */

        if (Date.now() - start > 30000) {
            throw new Error('30000 ms timeout')
        }
    }
    /* Instead of callbackOnce the returned promise from this function
itself can be used */
}

# kai zhu (7 years ago)

the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?

# Naveen Chawla (7 years ago)

Yes, you need to intervene and reject the latest promise upon timeout (by having a reference to its "reject" callback).

This makes me wonder (and I'd like to be corrected if wrong) if async iterators are more of a hindrance than a help?

We can currently do a loop over an array of promises, without async iterators:

async requestLoopAsync(){
    for(const requestItemPromise of requestItemPromises){
        //We can assign this here BEFORE we await the promise, unlike
        //with async iterators e.g.
        currentRequestItemPromise = requestItemPromise;

        const response = await requestItemPromise; //etc.
    }
}

Am I right or wrong?

(For the timeout example, on timeout we could do currentRequestItemPromise.myRejectCallbackReference() (where myRejectCallbackReference was assigned when we created the promise e.g. this.myRejectCallbackReference = reject from the reject parameter in (resolve, reject)=>)

# Naveen Chawla (7 years ago)

OK let me correct myself. That's preposterous. You have to await a promise to kick off it's process. But my point about being able to assign the promise to a wider-scoped currentRequestItemPromise before awaiting it which seems impossible with async iterators, which rejects upon timeout, stands, doesn't it?

# T.J. Crowder (7 years ago)

Related: esdiscuss.org/topic/how-about-awaiting-arrays (particularly the discussion of await.race), since effectively you're doing a race between a timeout and each chunk. Also relevant is the former work on cancelling promises, now withdrawn. (Can anyone point me at why it was withdrawn?)

-- T.J. Crowder

# Jan-Ivar Bruaroey (7 years ago)

Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. stackoverflow.com/a/41417429/918910

.: Jan-Ivar :.

# Domenic Denicola (7 years ago)

That is not why.

# Naveen Chawla (7 years ago)

No, you can easily terminate a promise by having saved a reference to its reject/resolve callback and calling it.

# Jan-Ivar Bruaroey (7 years ago)

Right, the proposals went beyond that. In this context though I thought T.J. was looking for a control surface in promises. I shouldn't assume.

In any case, the original challenge here seems solved by something like:


var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function consumeReadableStream(stream) {

   async function read() {
     try {
       for await (const chunk of stream) {
         // do work
       }
     } catch (e) {
       if (e.name != "AbortError") throw e;
     }
   }
   await Promise.race([read(), wait(30000).then(() => stream.destroy())]);
}

...assuming .destroy() is overloaded to throw AbortError somehow, a detail IMHO. But aren't we digressing?

For me, for-await still seems like a useful paradigm, and I'm not sure readability is helped by using linear code to represent non-linear code flow.

.: Jan-Ivar :.

# Naveen Chawla (7 years ago)

Jan-Ivar Bruaroey,

You are perhaps conflating asynchronicity with non-linearity.

Many asynchronous data flows are most definitely linear.

Hence the great benefit in code cleanliness afforded by await async.

I've concluded that a lot of stream-like functionality can be achieved already with await async and promises alone, even without for await, and in some cases for await gets in the way (as exemplified).

I just needed to think through it. Where a target value needs to change on each iteration, it can simply be a wider scoped variable manipulated by the stream. Understanding that a promise resolves to a single value just means a new promise for each iteration.

So as things stand, I am very comfortable with just the current features, and will be unlikely to use for await for what I was thinking of.