streaming with coroutines
It's worth considering just how the | operator works when it hits a terminal. There are three cases we want to consider 1) processing with a block, 2) writing in to another collection, and 3) writing to a socket.
In the first scenario we want to read from the left-side input until it says there is nothing more to be gotten. How do we know that we're at the end? it needs to tell us. The result from asking that object for data should either be the data or an EndOfStream failure.
[input | terminal]
[input: Stream, terminal: Block -> return: ø]
[
buffer := Array of: Any size: 1;
[
[_ step] [Integer] [terminal evaluate: buffer]
[_ step] [EndOfStream] [return <- ø]
amount-read := input read-into: buffer;
amount-read step;
] repeat
]
In the second scenario we want the left-side input to copy as much as it can in to our collection until it runs out or we can't fit anymore.
[input | terminal]
[input: Stream, terminal: Array -> return: ø]
[
[
[count step] [Integer] [terminal := terminal[count:]]
[_ step] [EndOfStream] [return <- ø]
amount-read := input read-into: terminal;
amount-ready step;
] repeat
]
The third scenario is the simplest of them all, since sockets will block automatically. It is actually the same code as for arrays, so we can rewrite the previous one to accept an Array or a Socket.
[input | terminal]
[input: Stream, terminal: Array + Socket -> return: ø]
[
[
[count step] [Integer] [terminal := terminal[count:]]
[_ step] [EndOfStream] [return <- ø]
amount-read := input read-into: terminal;
amount-read step;
]
]
What does read-into: actually do then? It depends on what it is. If it's an Array it will check its size and copy the min between its size and the output size, then increment its own offset. It will use a coroutine to loop until there's nothing else to provide.
[src read-into: dst]
[Array, Array -> return: Integer + EndOfStream]
[
p := src[0:]
[
size := p size min: dst size;
size == 0 then: [return <- EndOfStream];
dst copy: p[::size]
p := p[size:]
return <- size
] repeat
]
An alternative implementation would be for | to construct an ArrayStream and it can update an internal offset each time read-into: is called. That would complicate the above | methods slightly. Here we get to utilise the types to dictate our behaviour.
What's not clear is how we know we have a continuation. To do continuations properly we'd need to create a coroutine in the | method and then we can call it over and over again until it doesn't want to be called anymore.
The design of the continuation library in a previous post basically hid coroutines from the language at large; where only specific IO operations and forked processes ran the coroutines for you.
The way that'd work is we'd make a new green thread and resume it until it doesn't want to be resumed. We want to avoid having to use any semaphores or locks or channels between the green threads. It's co-operative threading rather than platform threading so we know that it will only call back to us when we're ready to do more work.
What this post highlights more than anything else is the coroutines answer is not enough of an answer yet. It needs further exploration. This is a prime candidate for getting it right.
The last time we looked at coroutines in STZ I ended with this: "For now I'm going to leave this in the ??? column and state that it's intended to be part of the STZ but may or may not be in the first cut"
I suppose I'm still there.