Streams

channels, sockets & cooperative message passing


Functions

Predicates


The Streams module is Arturo's home for message passing - both between concurrent tasks (via :channel) and across the network (via sockets). It exposes a small, uniform vocabulary: you send a value at a destination, receive one from a source, and unplug when you're done. Those same three verbs work polymorphically over channels and sockets alike.

Key Concepts

  • A :channel is an in-process pipe between fibers - Go-style
  • send / receive / unplug work on both :channel and :socket
  • Channels have three buffering modes: unbuffered, bounded, unbounded
  • Operations are cooperative: a send/receive parks the fiber, it never blocks the whole VM
  • Channels can cross the do.async.isolated process boundary, full-duplex
  • Sockets give you TCP/UDP network streams with the same verbs

Note
Inside a non-main fiber, send / receive / accept / connect yield cooperatively - so a parked operation lets sibling tasks run instead of stalling everything. On the main fiber they behave synchronously, as you'd expect.

Basic Usage

Creating Channels

Jobs: channel 'jobs                 ; unbuffered (synchronous handoff)
Jobs: channel.bounded: 10 'jobs     ; bounded buffer of 10
Jobs: channel.unbounded 'jobs       ; never blocks on send

Important
Channels are named for debugging, not keyed by name. Two channel 'foo calls produce two distinct channels - identity is by reference. (This is the opposite of events, which are keyed by name.) The name only shows up in print / inspect.

Sending & Receiving

send Jobs 42            ; parks if unbuffered and no receiver is ready
v: receive Jobs         ; parks if the channel is empty
unplug Jobs             ; close it - parked receivers wake with :null

Caution
receive on a closed, empty channel returns :null - that's your signal to stop (mirrors Go's v, ok := <-ch, with null? v as the check). send on a closed channel raises an error, so guard it with try if a close can race your send.

Common Patterns

Producer / Consumer

Jobs: channel 'jobs

producer: do.async [
    loop urls 'u -> send Jobs u
    unplug Jobs                     ; signal "no more work"
]

consumer: do.async [
    keepGoing: true
    while [keepGoing][
        u: receive Jobs
        switch null? u [
            keepGoing: false        ; channel drained & closed
        ][
            print u
        ]
    ]
]

wait.all @[producer consumer]

Fan-out: A Pool of Workers

; N workers all pulling from one channel
workers: map 1..4 'w [
    do.async [
        keepGoing: true
        while [keepGoing][
            j: receive Jobs
            switch null? j [keepGoing: false][process j]
        ]
    ]
]

Cross-process Worker Pool

Channels reach across the do.async.isolated boundary in both directions - the parent acts as a broker, so isolated workers can pull jobs and push results:

Jobs:    channel.bounded: 20 'jobs
Results: channel.bounded: 20 'results

; spin up isolated workers
loop 1..3 'k [
    do.async.isolated [
        loop 1..2 'i [
            j: receive channel 'jobs
            send channel 'results j * 10
        ]
    ]
]

loop 1..6 'i -> send Jobs i
print sort map 1..6 'k -> receive Results   ; [10 20 30 40 50 60]

Warning
Payloads crossing a process boundary must be round-trippable values (integers, strings, blocks, dictionaries, logicals, null). Functions, methods and objects with magic methods don't currently marshal across processes.

Network Streams

The same send / receive / unplug verbs drive sockets - the module handles TCP (default) and UDP, server- and client-side:

; --- server ---
server: listen 8000
print "Server listening on port 8000..."

client: accept server               ; wait for a connection
message: receive.timeout:1000 client
send client ~"got: |message|"
unplug client                       ; free the socket when done
; --- client ---
client: connect.to:"localhost" 8000
send client "hello server!"
print receive client
unplug client

For faster, connectionless communication, switch to UDP:

server: listen.udp 8000

Warning
UDP messages may arrive out of order or not at all - only use it when your application can tolerate loss. And always unplug your sockets when you're done to free system resources.

Tip
Because channels are single-threaded and cooperative, there are no locks to reason about - every send/receive boundary is the synchronization point. Need to cap concurrency? A bounded channel pre-loaded with tokens makes a tidy semaphore.