streaming without coroutines
Let's try again but this time we do things the simpler way. Brick and Mortar. That's the idea of low level programming anyway. Nothing fancy.
We have a few combinations we care about:
socket | file | array | block | |
---|---|---|---|---|
socket | X | X | X | X |
file | X | X | X | X |
array | X | X | X | X |
block | X | X | X | X |
Quite a few combinations. Arrays and Files have a size, while Sockets and Blocks don't. We also want to make sure we're building a declaration until we hit a terminal.
That means we have intermediate forms, eg: socket | block turns in to a new source if the block returns something. If it doesn't return something then it will execute the stream.
You also cannot put a readonly file on the right hand side of the stream, as it cannot be written in to. Likewise a socket that does not have a write pipe cannot be on the right, nor can a socket that does not have a read pipe be on the left.
We want to string as many of the stages in the stream together as possible. If they share the same kind of container then the destination can be used to hold the intermediate values allowing bulk reads from the source and avoid a bunch of intermediate allocations.
An example would be reading from a socket in to a UTF8 stream. They are both byte streams. You don't want to read a byte at a time from the socket but you also don't want to create an extra buffer in the middle of your output to the input.
If you are reading from a socket, chances are you do want an intermediate buffer of some kind. Likely a ring-buffer so when you have exhausted the buffer it will ask the socket for more.
output: String;
socket
| {ring-buffer} {size: 4096}
| to-utf8-string
| [input -> output] [output <- input to-uppercase]
| output;
The final action of | output will start the processing in motion. It will ask the block to read in to its available capacity space which will initially start at something small like 2 and grow exponentially each time it continues - +4=6, +8=14, +16=30, +32=62, +64=126, etc... until the stream ends.
capacity - length == 0 then: [grow]
source read: capacity - length into: self
The block knows its input type and output type. The output must match the species otherwise this code wouldn't have compiled, ie the output must be of base-type Array of: Z and the block type must be Y→Z. If it happens to be Z→Z then it can re-use the output container and read in bulk. If it's not, it'll read one element at a time.
[self read: size into: dst]
[self: dst element-class -> dst element-class,
size: Integer,
dst: (Array of: dst-element-class)]
[[amount transform] [EndOfStream] []
[amount transform] [Integer]
[dst[::amount]
| [i, element -> ø] [dst[i] := block evaluate: element]]
read-amount := src read: size into: dst;
read-amount transform;
return <- read-amount]
and element-by-element:
[self read: size into: dst]
[self: src element-type -> dst element-type,
size: Integer,
dst: (Array of: dst element-type)
-> return: Integer + EndOfStream]
[buffer: (Array of: src element-class size: 1);
[amount transform] [EndOfStream] [return <- EndOfStream]
[amount transform] [Integer]
[dst[0:] := block evaluate: buffer[0]]
[::size]
| [read-amount := src read: 1 into: buffer;
read-amount transform];
return size]
This seems.. wordy. It might just be because of the switch and everything is fine. Or it might be all the type information which again in theory is fine. It might be because the switch is acting off of either an Integer or an EndOfStream.
May be we should refactor this.
[self read: size into: buffer copy-to: dst]
[self: src element-type -> dst element->type,
size: Integer
buffer: (Array of: src element-type)
dst: dst
-> return: Integer + EndOfStream]
[ [ // switch Integer + EndOfStream
[amount transform] [EndOfStream] [return <- EndOfStream]
[amount transform] [Integer]
[buffer[::amount]
| [i, element] [dst[i] := self evaluate: element]
buffer := buffer[amount:]]
buffer size == 0 then: [return <- size];
read-size := src read: buffer size into: buffer;
read-size transform];
] repeat]
Now we can re-use that for our two use-cases. First bulk reading:
[self read: size into: dst]
[self: dst element-class -> dst element-class,
size: Integer,
dst: (Array of: dst-element-class)
-> Integer + EndOfStream]
[self read: size into: dst copy-to: dst]
And one at a time:
[self read: size into: dst]
[self: src element-class -> dst element-class,
size: Integer,
dst: (Array of: dst-element-class)
-> Integer + EndOfStream]
[ buffer: (Array of: src element-class size: 1)
self read: size into: buffer copy-to: dst]
As usual refactoring does help remove potential bugs and clean up code. It also makes things more vague in a way. Now we have two methods that exist solely to differential between Z → Z and Y → Z allowing us to either re-use the destination as a buffer or not.
We should be able to re-use this technique for all source/destination pairs and only need to implement the inner transformation. In fact the only two variables we see here are 'self' and 'src'. We should probably define a structure for this:
StreamTransformation
(#specialise: [from: Source to: Destination] []
src: Source
dst: Destination
block: Src element-class -> Destination element-class)
With this structure it doesn't matter what kind of source or destination it is, so long as they element element-class. To make this true we'd need to change our general method to not assume the destination is an Array. The Source is only assumed to understand read:into: so the destination should similarly be expected to understand some kind of write:from: but in this case we'll simply call it copy:.
[dst copy: src] [Array, Array -> Integer]
[ size := dst size min: src size;
[::dst size min: src size] | [i] [dst[i] := src[i]];
size]
[dst copy: src] [Array, Socket -> Integer + SocketFailure]
[src receive-into: dst]
[dst copy: src] [Socket, Array -> Integer + SocketFailure]
[dst send-from: src]
socket | file | array | block | |
---|---|---|---|---|
socket | X | X | √ | X |
file | X | X | X | X |
array | √ | X | √ | X |
block | X | X | X | X |
..etc. We fill out each of these until our table of sources and destnations is all √s instead of Xs. From here the implementation becomes academic. We end up with a pipe that lets as move data from any source to any destination. The | method makes a StreamTransformation and if the right-hand-side is a terminal, it tells it to #process or some similarly named method that loops until EndOfStream.