Streams
channels, sockets & cooperative message passing
Functions
Predicates
The Streams module is Arturo's home for message passing - both between concurrent tasks (via :channel) and across the network (via sockets). It exposes a small, uniform vocabulary: you send a value at a destination, receive one from a source, and unplug when you're done. Those same three verbs work polymorphically over channels and sockets alike.
Key Concepts
- A
:channelis an in-process pipe between fibers - Go-style send/receive/unplugwork on both:channeland:socket- Channels have three buffering modes: unbuffered, bounded, unbounded
- Operations are cooperative: a
send/receiveparks the fiber, it never blocks the whole VM - Channels can cross the
do.async.isolatedprocess boundary, full-duplex - Sockets give you TCP/UDP network streams with the same verbs
Note
Inside a non-main fiber,send/receive/accept/connectyield cooperatively - so a parked operation lets sibling tasks run instead of stalling everything. On the main fiber they behave synchronously, as you'd expect.
Basic Usage
Creating Channels
Jobs: channel 'jobs ; unbuffered (synchronous handoff)
Jobs: channel.bounded: 10 'jobs ; bounded buffer of 10
Jobs: channel.unbounded 'jobs ; never blocks on send
Important
Channels are named for debugging, not keyed by name. Twochannel 'foocalls produce two distinct channels - identity is by reference. (This is the opposite of events, which are keyed by name.) The name only shows up ininspect.
Sending & Receiving
send Jobs 42 ; parks if unbuffered and no receiver is ready
v: receive Jobs ; parks if the channel is empty
unplug Jobs ; close it - parked receivers wake with :null
Caution
receiveon a closed, empty channel returns:null- that's your signal to stop (mirrors Go'sv, ok := <-ch, withnull? vas the check).sendon a closed channel raises an error, so guard it withtryif a close can race your send.
Common Patterns
Producer / Consumer
Jobs: channel 'jobs
producer: do.async [
loop urls 'u -> send Jobs u
unplug Jobs ; signal "no more work"
]
consumer: do.async [
keepGoing: true
while [keepGoing][
u: receive Jobs
switch null? u [
keepGoing: false ; channel drained & closed
][
print u
]
]
]
wait.all @[producer consumer]
Fan-out: A Pool of Workers
; N workers all pulling from one channel
workers: map 1..4 'w [
do.async [
keepGoing: true
while [keepGoing][
j: receive Jobs
switch null? j [keepGoing: false][process j]
]
]
]
Cross-process Worker Pool
Channels reach across the do.async.isolated boundary in both directions - the parent acts as a broker, so isolated workers can pull jobs and push results:
Jobs: channel.bounded: 20 'jobs
Results: channel.bounded: 20 'results
; spin up isolated workers
loop 1..3 'k [
do.async.isolated [
loop 1..2 'i [
j: receive channel 'jobs
send channel 'results j * 10
]
]
]
loop 1..6 'i -> send Jobs i
print sort map 1..6 'k -> receive Results ; [10 20 30 40 50 60]
Warning
Payloads crossing a process boundary must be round-trippable values (integers, strings, blocks, dictionaries, logicals, null). Functions, methods and objects with magic methods don't currently marshal across processes.
Network Streams
The same send / receive / unplug verbs drive sockets - the module handles TCP (default) and UDP, server- and client-side:
; --- server ---
server: listen 8000
print "Server listening on port 8000..."
client: accept server ; wait for a connection
message: receive.timeout:1000 client
send client ~"got: |message|"
unplug client ; free the socket when done
; --- client ---
client: connect.to:"localhost" 8000
send client "hello server!"
print receive client
unplug client
For faster, connectionless communication, switch to UDP:
server: listen.udp 8000
Warning
UDP messages may arrive out of order or not at all - only use it when your application can tolerate loss. And alwaysunplugyour sockets when you're done to free system resources.
Tip
Because channels are single-threaded and cooperative, there are no locks to reason about - everysend/receiveboundary is the synchronization point. Need to cap concurrency? A bounded channel pre-loaded with tokens makes a tidy semaphore.