Stream + async await
From: es-discuss [mailto:es-discuss-bounces at mozilla.org] On Behalf Of Naveen Chawla Sent: Tuesday, July 11, 2017 09:24 To: es-discuss at mozilla.org Subject: Stream + async await
It'd be great to have async stream constructs such as: reactivex.io/rxjs , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).
Interesting!!!
Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?
As far as I can see,
for await (const item of requestItems){
}
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.
Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.
You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)
One option might be something like rbuckton/prex/blob/master/docs/scheduling.md#class-asyncqueue. It allows you to put items on the queue as soon as they are available.
Ron
From: Naveen Chawla<mailto:naveen.chwl at gmail.com>
Sent: Saturday, July 29, 2017 3:54 AM To: Domenic Denicola<mailto:d at domenic.me>; es-discuss at mozilla.org<mailto:es-discuss at mozilla.org>
Subject: Re: Stream + async await
Interesting!!!
Excuse my ignorance, but with this construct, how would you trivially invoke a "publish" ahead of any given "consumption"?
As far as I can see,
for await (const item of requestItems){
}
on its own is purely a "front-pressure" construct. That is, each request is made upon the completion and satisfactory consumption of the last one.
Can you suggest a way to trivially invoke some "back pressure" using this construct? By this I mean - invoke extra requests before they come to be consumed (like you can do with reactive streaming libraries). An example use would be if you wanted to do some "eager loading" of data while the user is likely to be viewing but not currently interacting with existing content, for example.
You seem very familiar with this construct, so I wouldn't be surprised if you've already thought about this! (If you're too busy I'm sure there are others here familiar with it too!)
On Tue, 11 Jul 2017 at 21:09 Domenic Denicola <d at domenic.me<mailto:d at domenic.me>> wrote:
tc39/proposal-async-iterationna01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Ftc39%2Fproposal-async-iteration&data=04|01|ron.buckton%40microsoft.com|90d8f33f560d48305ad808d4d6703ae6|72f988bf86f141af91ab2d7cd011db47|1|0|636369224897437474|Unknown|VW5rbm93bnx7IlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiT3RoZXIifQ%3D%3D|-1&sdata=QxOturJl0rd7nIv2%2Fy2MVAiVCVF2sd8arppSGIFiV9k%3D&reserved=0
From: es-discuss [mailto:es-discuss-bounces at mozilla.org<mailto:es-discuss-bounces at mozilla.org>] On Behalf Of Naveen Chawla
Sent: Tuesday, July 11, 2017 09:24 To: es-discuss at mozilla.org<mailto:es-discuss at mozilla.org>
Subject: Stream + async await
It'd be great to have async stream constructs such as: reactivex.io/rxjsna01.safelinks.protection.outlook.com/?url=http%3A%2F%2Freactivex.io%2Frxjs%2F&data=04|01|ron.buckton%40microsoft.com|90d8f33f560d48305ad808d4d6703ae6|72f988bf86f141af91ab2d7cd011db47|1|0|636369224897437474|Unknown|VW5rbm93bnx7IlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiT3RoZXIifQ%3D%3D|-1&sdata=WlsRBdhN2hkFpFcQ6zJK846k31TYWog91myIwiw02ow%3D&reserved=0 , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).
There's also this strawman of mine which deals with most things async, but it has several of its own issues that I haven't quite addressed (complexity still being one after a week straight):
isiahmeadows/non-linear-proposal
I will caution that async iteration and observation can't really be merged meaningfully as you would hope. They both represent multiple values over time, but one is eager, the other lazy.
Guys!
I thought of a way of doing this. Roughly it's having 2 loops, one for consumer and one for requester. Would this work?:
startRequestingAsync();
startConsumingAsync();
async startRequestingAsync(){
for async (const item of requestItems){
//Control the requesting. Do nothing to actually consume the
data
}
}
async startConsumingAsync(){
for async(const item of requestItems){
//Consume the item. Here you can also apply "front pressure" to
request the next item if the "request" loop is behind
}
}
I haven't actually done this so it's just thinking out loud.
Thoughts?
so that everyday programmers can relate, can someone give a code-snippet of the proposal equivalent to the common-use nodejs example below?
function consumeReadableStream(stream, consumeChunk, callback) {
/*
* stream - readable stream
* consumeChunk - has signature - function (chunk) {...}
* callback - has signature - function (error) {...}
*/
var callbackOnce, done, timerTimeout;
callbackOnce = function (error) {
/*
* this function will ensure callback is called only once
*/
if (done) {
return;
}
done = true;
// clear timeout
clearTimeout(timerTimeout);
callback(error);
};
// init timeout handler
timerTimeout = setTimeout(function () {
callbackOnce(new Error('30000 ms timeout'));
stream.destroy();
}, 30000);
stream.on('data', consumeChunk);
stream.on('end', callbackOnce);
stream.on('error', callbackOnce);
};
It'll look something like this:
async function consumeReadableStream(stream) {
const start = Date.now()
for await (const chunk of stream) {
/* Do whatever you want with the chunk here e,g, await other
async tasks with chunks
send them off to wherever, etc
*/
if (Date.now() - start > 30000) {
throw new Error('30000 ms timeout')
}
}
/* Instead of callbackOnce the returned promise from this function
itself can be used */
}
the timeout handler will not work as advertised, e.g. what if io / db issues causes a network stream to intermittently respond in intervals far greater than 30000ms or not at all?
Yes, you need to intervene and reject the latest promise upon timeout (by having a reference to its "reject" callback).
This makes me wonder (and I'd like to be corrected if wrong) if async iterators are more of a hindrance than a help?
We can currently do a loop over an array of promises, without async iterators:
async requestLoopAsync(){
for(const requestItemPromise of requestItemPromises){
//We can assign this here BEFORE we await the promise, unlike
//with async iterators e.g.
currentRequestItemPromise = requestItemPromise;
const response = await requestItemPromise; //etc.
}
}
Am I right or wrong?
(For the timeout example, on timeout we could do
currentRequestItemPromise.myRejectCallbackReference()
(where
myRejectCallbackReference
was assigned when we created the promise e.g.
this.myRejectCallbackReference = reject
from the reject
parameter in
(resolve, reject)=>
)
OK let me correct myself. That's preposterous. You have to await
a
promise to kick off it's process. But my point about being able to assign
the promise to a wider-scoped currentRequestItemPromise
before awaiting
it which seems impossible with async iterators, which rejects upon timeout,
stands, doesn't it?
Related: esdiscuss.org/topic/how-about-awaiting-arrays
(particularly the discussion of await.race
), since effectively you're
doing a race between a timeout and each chunk. Also relevant is the former
work on cancelling promises, now withdrawn. (Can anyone point me at why
it was withdrawn?)
-- T.J. Crowder
Because a promise is not a control surface of the asynchronous action fulfilling it; confuses owner with consumer. stackoverflow.com/a/41417429/918910
.: Jan-Ivar :.
That is not why.
No, you can easily terminate a promise by having saved a reference to its reject/resolve callback and calling it.
Right, the proposals went beyond that. In this context though I thought T.J. was looking for a control surface in promises. I shouldn't assume.
In any case, the original challenge here seems solved by something like:
var wait = ms => new Promise(resolve => setTimeout(resolve, ms));
async function consumeReadableStream(stream) {
async function read() {
try {
for await (const chunk of stream) {
// do work
}
} catch (e) {
if (e.name != "AbortError") throw e;
}
}
await Promise.race([read(), wait(30000).then(() => stream.destroy())]);
}
...assuming .destroy() is overloaded to throw AbortError somehow, a detail IMHO. But aren't we digressing?
For me, for-await still seems like a useful paradigm, and I'm not sure readability is helped by using linear code to represent non-linear code flow.
.: Jan-Ivar :.
Jan-Ivar Bruaroey,
You are perhaps conflating asynchronicity with non-linearity.
Many asynchronous data flows are most definitely linear.
Hence the great benefit in code cleanliness afforded by await
async
.
I've concluded that a lot of stream-like functionality can be achieved
already with await
async
and promises alone, even without for
await
, and in some cases for
await
gets in the way (as exemplified).
I just needed to think through it. Where a target value needs to change on each iteration, it can simply be a wider scoped variable manipulated by the stream. Understanding that a promise resolves to a single value just means a new promise for each iteration.
So as things stand, I am very comfortable with just the current features,
and will be unlikely to use for
await
for what I was thinking of.
It'd be great to have async stream constructs such as: reactivex.io/rxjs , supported natively, such that they can be used directly with the async / await keywords for async stream programming in a linear fashion (analogous to what can already be done with linearly awaiting Promises, but for async streams instead).